diff --git a/.env.example b/.env.example index f3d50f3..2204842 100644 --- a/.env.example +++ b/.env.example @@ -16,6 +16,16 @@ CLAUDE_CODE_OAUTH_TOKEN= ANTHROPIC_API_KEY= CLAUDE_MODEL= CLAUDE_CODE_PATH= +CLAUDE_MAX_TURNS=2 +# Claude binary observability: off | stdout | file | both +CLAUDE_OBSERVABILITY_MODE=off +# CLAUDE_OBSERVABILITY_VERBOSITY: summary | full +CLAUDE_OBSERVABILITY_VERBOSITY=summary +# Relative to repository workspace root in UI/provider runs. +CLAUDE_OBSERVABILITY_LOG_PATH=.ai_ops/events/claude-trace.ndjson +CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL=false +CLAUDE_OBSERVABILITY_DEBUG=false +CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH= # Agent management limits AGENT_MAX_CONCURRENT=4 @@ -28,6 +38,7 @@ AGENT_PROJECT_CONTEXT_PATH=.ai_ops/project-context.json AGENT_TOPOLOGY_MAX_DEPTH=4 AGENT_TOPOLOGY_MAX_RETRIES=2 AGENT_RELATIONSHIP_MAX_CHILDREN=4 +AGENT_MERGE_CONFLICT_MAX_ATTEMPTS=2 # Resource provisioning (hard + soft constraints) AGENT_WORKTREE_ROOT=.ai_ops/worktrees @@ -42,7 +53,7 @@ AGENT_PORT_LOCK_DIR=.ai_ops/locks/ports AGENT_DISCOVERY_FILE_RELATIVE_PATH=.agent-context/resources.json # Security middleware -# AGENT_SECURITY_VIOLATION_MODE: hard_abort | validation_fail +# AGENT_SECURITY_VIOLATION_MODE: hard_abort | validation_fail | dangerous_warn_only AGENT_SECURITY_VIOLATION_MODE=hard_abort AGENT_SECURITY_ALLOWED_BINARIES=git,npm,node,cat,ls,pwd,echo,bash,sh AGENT_SECURITY_COMMAND_TIMEOUT_MS=120000 diff --git a/.gitignore b/.gitignore index 736c4c3..5250936 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ dist mcp.config.json .ai_ops .agent-context +.workspace \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md index 0d6f3ec..3ce1edf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -29,6 +29,7 @@ - `AGENT_TOPOLOGY_MAX_DEPTH` - `AGENT_TOPOLOGY_MAX_RETRIES` - `AGENT_RELATIONSHIP_MAX_CHILDREN` + - `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS` - Provisioning/resource controls: - `AGENT_WORKTREE_ROOT` - `AGENT_WORKTREE_BASE_REF` diff --git a/README.md b/README.md index 82e98d9..f7b9e6d 100644 --- a/README.md +++ b/README.md @@ -20,10 +20,13 @@ TypeScript runtime for deterministic multi-agent execution with: - Runtime events are emitted as best-effort side-channel telemetry and do not affect orchestration control flow. - `AgentManager` is an internal utility used by the pipeline when fan-out/retry-unrolled behavior is required. - Session state is persisted under `AGENT_STATE_ROOT`. -- Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with schema-versioned JSON (`schemaVersion`) and domains: +- Session lifecycle is explicit (`POST /api/sessions`, `POST /api/sessions/:id/run`, `POST /api/sessions/:id/close`) and each session is bound to a target project path. +- Session project context is persisted as schema-versioned JSON (`schemaVersion`) with domains: - `globalFlags` - `artifactPointers` - `taskQueue` + - each task record stores `taskId`, status, and optional `worktreePath` for task-scoped workspace ownership + - conflict-aware statuses are supported (`conflict`, `resolving_conflict`) ## Deep Dives @@ -95,6 +98,7 @@ The UI provides: - graph visualizer with topology/retry rendering, edge trigger labels, node economics (duration/cost/tokens), and critical-path highlighting - node inspector with attempt metadata and injected `ResolvedExecutionContext` sandbox payload - live runtime event feed from `AGENT_RUNTIME_EVENT_LOG_PATH` with severity coloring (including security mirror events) +- Claude trace feed from `CLAUDE_OBSERVABILITY_LOG_PATH` (query lifecycle, SDK message types/subtypes, and errors) - run trigger + kill switch backed by `SchemaDrivenExecutionEngine.runSession(...)` - run mode selector: `provider` (real Codex/Claude execution) or `mock` (deterministic dry-run executor) - provider selector: `codex` or `claude` @@ -108,6 +112,9 @@ Provider mode notes: - `provider=codex` uses existing OpenAI/Codex auth settings (`OPENAI_AUTH_MODE`, `CODEX_API_KEY`, `OPENAI_API_KEY`). - `provider=claude` uses Claude auth resolution (`CLAUDE_CODE_OAUTH_TOKEN` preferred, otherwise `ANTHROPIC_API_KEY`, or existing Claude Code login state). - `CLAUDE_MODEL` should be a Claude model id/alias recognized by Claude Code (for example `claude-sonnet-4-6`); `anthropic/...` prefixes are normalized automatically. +- `CLAUDE_MAX_TURNS` controls the per-query Claude turn budget (default `2`). +- Claude provider runs can emit Claude SDK/CLI internals to stdout and/or NDJSON with `CLAUDE_OBSERVABILITY_*` settings. +- UI session-mode provider runs execute directly in orchestration-assigned task/base worktrees; provider adapters do not allocate additional nested worktrees. ## Manifest Semantics @@ -131,9 +138,9 @@ Pipeline edges can route via: Domain events are typed and can trigger edges directly: - planning: `requirements_defined`, `tasks_planned` -- execution: `code_committed`, `task_blocked` +- execution: `code_committed`, `task_ready_for_review`, `task_blocked` - validation: `validation_passed`, `validation_failed` -- integration: `branch_merged` +- integration: `branch_merged`, `merge_conflict_detected`, `merge_conflict_resolved`, `merge_conflict_unresolved`, `merge_retry_started` Actors can emit events in `ActorExecutionResult.events`. Pipeline status also emits default validation/execution events. @@ -202,6 +209,30 @@ Notes: - `security.tool.invocation_allowed` - `security.tool.invocation_blocked` +## Claude Observability + +- `CLAUDE_OBSERVABILITY_MODE=stdout` prints structured Claude query internals (tool progress, system events, stderr, result lifecycle) to stdout as JSON lines prefixed with `[claude-trace]`. +- `CLAUDE_OBSERVABILITY_MODE=file` appends the same records to `CLAUDE_OBSERVABILITY_LOG_PATH`. +- `CLAUDE_OBSERVABILITY_MODE=both` enables both outputs. +- Output samples high-frequency `tool_progress` events to avoid log flooding while retaining suppression counters. +- `assistant` and `user` message records are retained so turn flow is inspectable end-to-end. +- `CLAUDE_OBSERVABILITY_VERBOSITY=summary` stores compact metadata; `full` stores redacted full SDK message payloads. +- `CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL=true` enables and emits sampled partial assistant stream events from the SDK. +- `CLAUDE_OBSERVABILITY_DEBUG=true` enables Claude SDK debug mode. +- `CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH` writes Claude SDK debug output to a file (also enables debug mode). +- In UI/provider mode, `CLAUDE_OBSERVABILITY_LOG_PATH` resolves relative to the repo workspace root. +- UI API: `GET /api/claude-trace?limit=&sessionId=` reads filtered Claude trace records. + +Example: + +```bash +CLAUDE_OBSERVABILITY_MODE=both +CLAUDE_OBSERVABILITY_VERBOSITY=summary +CLAUDE_OBSERVABILITY_LOG_PATH=.ai_ops/events/claude-trace.ndjson +CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL=false +CLAUDE_OBSERVABILITY_DEBUG=false +``` + ### Analytics Quick Start Inspect latest events: @@ -245,6 +276,7 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson - Pipeline behavior on `SecurityViolationError` is configurable: - `hard_abort` (default) - `validation_fail` (retry-unrolled remediation) + - `dangerous_warn_only` (logs violations and continues execution; high risk) ## Environment Variables @@ -259,6 +291,13 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson - `ANTHROPIC_API_KEY` (used when `CLAUDE_CODE_OAUTH_TOKEN` is unset) - `CLAUDE_MODEL` - `CLAUDE_CODE_PATH` +- `CLAUDE_MAX_TURNS` (integer >= 1, defaults to `2`) +- `CLAUDE_OBSERVABILITY_MODE` (`off`, `stdout`, `file`, or `both`) +- `CLAUDE_OBSERVABILITY_VERBOSITY` (`summary` or `full`) +- `CLAUDE_OBSERVABILITY_LOG_PATH` +- `CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL` (`true` or `false`) +- `CLAUDE_OBSERVABILITY_DEBUG` (`true` or `false`) +- `CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH` - `MCP_CONFIG_PATH` ### Agent Manager Limits @@ -274,6 +313,7 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson - `AGENT_TOPOLOGY_MAX_DEPTH` - `AGENT_TOPOLOGY_MAX_RETRIES` - `AGENT_RELATIONSHIP_MAX_CHILDREN` +- `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS` ### Provisioning / Resource Controls @@ -289,7 +329,7 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson ### Security Middleware -- `AGENT_SECURITY_VIOLATION_MODE` (`hard_abort` or `validation_fail`) +- `AGENT_SECURITY_VIOLATION_MODE` (`hard_abort`, `validation_fail`, or `dangerous_warn_only`) - `AGENT_SECURITY_ALLOWED_BINARIES` - `AGENT_SECURITY_COMMAND_TIMEOUT_MS` - `AGENT_SECURITY_AUDIT_LOG_PATH` diff --git a/docs/orchestration-engine.md b/docs/orchestration-engine.md index 20b27f4..a10e198 100644 --- a/docs/orchestration-engine.md +++ b/docs/orchestration-engine.md @@ -37,6 +37,11 @@ Before each actor invocation, orchestration resolves an immutable `ResolvedExecu This keeps orchestration policy resolution separate from executor enforcement. Executors do not need to parse manifests or MCP registry internals. +Worktree ownership invariant: + +- In UI session mode, orchestration/session lifecycle is the single owner of git worktree allocation. +- Provider adapters (Codex/Claude runtime wrappers) must execute inside `ResolvedExecutionContext.security.worktreePath` and must not provision independent worktrees. + ## Execution topology model - Pipeline graph execution is DAG-based with ready-node frontiers. @@ -52,10 +57,16 @@ This keeps orchestration policy resolution separate from executor enforcement. E - planning: `requirements_defined`, `tasks_planned` - execution: `code_committed`, `task_blocked` - validation: `validation_passed`, `validation_failed` - - integration: `branch_merged` + - integration: `branch_merged`, `merge_conflict_detected`, `merge_conflict_resolved`, `merge_conflict_unresolved`, `merge_retry_started` - Pipeline edges can trigger on domain events (`edge.event`) in addition to legacy status triggers (`edge.on`). - `history_has_event` route conditions evaluate persisted domain event history entries (`validation_failed`, `task_blocked`, etc.). +## Merge conflict orchestration + +- Task merge/close merge operations return structured outcomes (`success`, `conflict`, `fatal_error`) instead of throwing for conflicts. +- Task state supports conflict workflows (`conflict`, `resolving_conflict`) and conflict metadata is persisted under `task.metadata.mergeConflict`. +- Conflict retries are bounded by `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS`; exhaustion emits `merge_conflict_unresolved` and the session continues without crashing. + ## Security note Security enforcement now lives in `src/security`: diff --git a/docs/security-middleware.md b/docs/security-middleware.md index 04dd581..01e6303 100644 --- a/docs/security-middleware.md +++ b/docs/security-middleware.md @@ -30,6 +30,7 @@ This middleware provides a first-pass hardening layer for agent-executed shell c - `hard_abort` (default): fail fast and stop the pipeline. - `validation_fail`: map violation to retry-unrolled behavior so the actor can attempt a compliant alternative. +- `dangerous_warn_only`: emit security audit/runtime events but continue execution. This is intentionally unsafe and should only be used for temporary unblock/debug workflows. ## MCP integration diff --git a/human_only_TODO b/human_only_TODO index 395fa63..4d35941 100644 --- a/human_only_TODO +++ b/human_only_TODO @@ -10,21 +10,137 @@ # in progress +there is some major ui issue. there is app/provider logic wrapped up in the ui which i didnt know about or understand and it has gotten out of hand. we need to rip it out and clean it up. additionally the work trees are still not working as intended after like 5 attempts to fix it so that has got to be officially spaghetti at this point + +here is the takeaway from the ui app logic issue + + - Keep orchestration core in src/agents. + - Move backend run/session/provider code out of src/ui into src/control-plane (or src/backend). + - Keep src/ui as static/frontend + API client only. + - Treat provider prompt shaping as an adapter concern (src/providers), not UI concern. +test results +session itself has a dir in worktrees that is a worktree +then there is a base dir and a tasks dir +base is also a worktree +inside of base, there is ANOTHER WORKTREE +inside of tasks is a product-intake??? directory +code is being written in both product-intake and the worktree in the base/worktrees/d3e411... directory +i dont think that the product guy is writing any files +fwiw, the dev agents are definitely making the app +log activity of claude code binary +WHY IS IT STILL NOT LOGGING WHAT IS ACTUALLY HAPPENING +it will not explain it, it just keeps adding different logs +test run +they are writing files! + +# problem 1 - logging +logging is still fucking dog dick fuck ass shit + +# problem 2 - worktree +the worktree shit is fucking insanity +they are getting confused because they see some of the orchestration infrastructure +they legit need to be in a clean room and know nothing about the world outside of their project going forward + +# problem 3 - task management/product context being passed in its entirety +the dev agents for some reason have the entire fucking task list in their context # Scheduled + So yes, the UI growing into “its own project” increases risk because orchestration logic leaks into UI-layer + services. + + Best refactor target: + + 1. Make UI a thin transport layer (HTTP in/out, no resource ownership decisions). + 2. Move run/session orchestration into one app-service module with a strict interface. + 3. Enforce single-owner invariants in code (worktree owner = session lifecycle only). + 4. Add contract tests around ownership boundaries (like the regression we just added). + +what even is src/examples ???? + + +clean up events/locks/ports (may not be needed with new session work?) +ui is gargantuan - needs a full rewrite in a different dir or something holy + +the ais arent actually writing to the directory +the ui is fucking bad +it kinda slow +i think the banned command thing is kind of restrictive, idk if they will really be able to do anything +codex isnt working? +i dont even know if this runs on linux at all +wtf is even happening in the backend i dont see any logs for it anywhere + +# identify potential design conflict dag vs things that happen not on the dag? +## linked to confusion around max env vars and what they do + + - "This runtime enforces a DAG, so merge-conflict retry loops are multi-run (rerun the same managed session) rather than in-run graph cycles." + - what are the implications of this? is this a stop gap until we do stricter control at the ends (task manager + git worktree cleanup)? or do we have a design inconsistency that we need to address? + + For structured-dev-workflow specifically + + - Happy path is depth 4 (product -> task -> dev -> qa -> merge). + - QA-fail/rework path reaches depth ~6. + - If merge-conflict resolver nodes are used, path can reach ~7. + +# confusion around max env vars and what they do +## linked to identify potential design conflict dag vs things that happen not on the dag? + +PROBLEM +.env has AGENT_MAX_RECURSIVE_DEPTH=3, but structured-dev-workflow requires depth 4 (product -> task -> dev -> qa -> merge) + - does this mean that if qa sends a task back to dev that the depth is 5? or does it literally mean num of agents in a branch of the dag? + - if it blocks, problem because it will just fail if we have to pass back + - if it doesnt, problem bc this does nothing - we are either allowing them to loop infinitely creating themselves OR we are relying on enforcing that elsewhere and we need to consolidated + +RESPONSE + +• Short answer: your concern is valid, and today the two depth limits are conflated more than they should be. + + 1. Does QA -> Dev make depth 5? + + - In this system, depth is not “number of unique agents.” It is path depth from the entry node (edge distance), plus recursive child depth when retries/fanout happen. + - Back-edges like qa -> dev to the same upstream node are not allowed because pipeline must be a strict DAG (manifest.ts (/home/zman/projects/ai_ops/src/agents/manifest.ts)). + - So “send back” is modeled as downstream rework nodes (qa -> dev-rework -> qa-rework -> merge), which does increase depth. + + 2. If it blocks, that’s bad; if not, does the limit do nothing? + + - It does block today, and it also does still serve a purpose. + - PipelineExecutor increments node depth by edge traversal (pipeline.ts (/home/zman/projects/ai_ops/src/agents/pipeline.ts:748), pipeline.ts (/home/zman/projects/ai_ops/src/agents/pipeline.ts:815)). + - But that same depth is passed into AgentManager.runRecursive (pipeline.ts (/home/zman/projects/ai_ops/src/agents/pipeline.ts:875)), and AgentManager enforces maxRecursiveDepth on that depth (manager.ts (/home/zman/projects/ai_ops/ + src/agents/manager.ts:343)). + - So AGENT_MAX_RECURSIVE_DEPTH is currently acting as a second pipeline-depth cap, not just a recursion cap. + + For structured-dev-workflow specifically + + - Happy path is depth 4 (product -> task -> dev -> qa -> merge). + - QA-fail/rework path reaches depth ~6. + - If merge-conflict resolver nodes are used, path can reach ~7. + - So with .env values like AGENT_MAX_RECURSIVE_DEPTH=3 and AGENT_TOPOLOGY_MAX_DEPTH=4, fallback branches will fail. + +SOLUTION + + 2. Structural fix (better): + + - Decouple meanings: + - AGENT_TOPOLOGY_MAX_DEPTH should gate DAG traversal depth only. + - AGENT_MAX_RECURSIVE_DEPTH should gate recursive fanout/retry depth only. + - In practice: stop passing pipeline queue depth into manager recursive depth; start recursive runs at a local depth baseline per node. + + 3. Safety/clarity guard: + + - Add a preflight check that computes max possible DAG depth and warns/errors if env depth limits are below it. + # other scheduled - persona definitions @@ -555,4 +671,150 @@ Manifest Builder: A UI to visually build or edit the AgentManifest (Schema "1"), Security Policy Management: An interface mapped to src/security/schemas.ts. This allows admins to define AGENT_SECURITY_ALLOWED_BINARIES, toggle AGENT_SECURITY_VIOLATION_MODE (hard_abort vs validation_fail), and manage MCP tool allowlists/banlists. -Environment & Resource Limits: Simple forms to configure agent manager limits (AGENT_MAX_CONCURRENT) and port block sizing without manually editing the .env file. \ No newline at end of file +Environment & Resource Limits: Simple forms to configure agent manager limits (AGENT_MAX_CONCURRENT) and port block sizing without manually editing the .env file. + + +# Architecture Requirements: Session Isolation & Task-Scoped Worktrees + +## Objective + +Disentangle the `ai_ops` control plane from the target project data plane. Replace the implicit `process.cwd()` execution anchor with a formal Session lifecycle and dynamic, task-scoped Git worktrees. This ensures concurrent agents operate in isolated environments and prevents the runtime from mutating its own repository. + +## 1. Domain Definitions + +- **Target Project:** The absolute local path to the repository being operated on (e.g., `/home/user/target_repo`). + +- **Session (The Clean Room):** A persistent orchestration context strictly bound to one Target Project. It maintains a "Base Workspace" (a localized Git checkout/branch) that represents the integrated, approved state of the current work period. + +- **Task Worktree:** An ephemeral Git worktree branched from the Session's Base Workspace. It is scoped strictly to a `taskId`, enabling multi-agent handoffs (e.g., Coder $\rightarrow$ QA) within the same isolated environment before merging back to the Base Workspace. + + +## 2. Core Data Model Updates + +Introduce explicit types to track project binding and resource ownership. + +- **API Payloads:** + + TypeScript + + ``` + interface CreateSessionRequest { + projectPath: string; // Absolute local path to target repo + } + ``` + +- **Session State (`AGENT_STATE_ROOT`):** + + TypeScript + + ``` + interface SessionMetadata { + sessionId: string; + projectPath: string; + sessionStatus: 'active' | 'suspended' | 'closed'; + baseWorkspacePath: string; // e.g., ${AGENT_WORKTREE_ROOT}/${sessionId}/base + createdAt: string; + updatedAt: string; + } + ``` + +- **Project Context (`src/agents/project-context.ts`):** + + Update the `taskQueue` schema to act as the persistent ledger for worktree ownership. + + TypeScript + + ``` + interface TaskRecord { + taskId: string; + status: 'pending' | 'in_progress' | 'review' | 'merged' | 'failed'; + worktreePath?: string; // e.g., ${AGENT_WORKTREE_ROOT}/${sessionId}/tasks/${taskId} + } + ``` + + +## 3. API & Control Plane (`src/ui/server.ts`) + +Replace implicit session generation with an explicit lifecycle API. + +- `POST /api/sessions`: Accepts `CreateSessionRequest`. Initializes the SessionMetadata and provisions the Base Workspace. + +- `GET /api/sessions`: Returns existing sessions for resuming work across restarts. + +- `POST /api/sessions/:id/run`: Triggers `SchemaDrivenExecutionEngine.runSession(...)`, passing the resolved `SessionMetadata`. + +- `POST /api/sessions/:id/close`: Prunes all task worktrees, optionally merges the Base Workspace back to the original `projectPath`, and marks the session closed. + + +## 4. Provisioning Layer (`src/agents/provisioning.ts`) + +Remove all fallback logic relying on `process.cwd()`. + +- **Session Initialization:** Clone or create a primary worktree of `projectPath` into `baseWorkspacePath`. + +- **Task Provisioning:** When a task begins execution, check out a new branch from the Base Workspace and provision it at `worktreePath`. + +- **Security & MCP Isolation:** `SecureCommandExecutor` and MCP handler configurations must dynamically anchor their working directories to the specific `worktreePath` injected into the execution context, preventing traversal outside the task scope. + + +## 5. Orchestration & Routing (`src/agents/pipeline.ts`) + +Implement the hybrid routing model: Domain Events for control flow, Project Context for resource lookup. + +1. **The Signal (Domain Events):** When a Coder agent finishes, it emits a standard domain event (e.g., `task_ready_for_review` with the `taskId`). The pipeline routes this event to trigger the QA agent. + +2. **The Map (Project Context):** Before initializing the QA agent's sandbox, the lifecycle observer/engine reads `project-context.ts` to look up the `worktreePath` associated with that `taskId`. + +3. **The Execution:** The QA agent boots inside the exact same Task Worktree the Coder agent just vacated, preserving all uncommitted files and local state. + +4. **The Merge:** Upon successful QA (e.g., `validation_passed`), the orchestration layer commits the Task Worktree, merges it into the Base Workspace, and deletes the Task Worktree. + + +# turning merge conflicts into first-class orchestration events instead of hard exceptions. + + 1. Add new domain events: + + - merge_conflict_detected + - merge_conflict_resolved + - merge_conflict_unresolved (after max attempts) + - optionally merge_retry_started + + 2. Extend task state model with conflict-aware statuses: + + - add conflict (and maybe resolving_conflict) + + 3. Change merge code path to return structured outcomes instead of throwing on conflict: + + - success + - conflict (with conflictFiles, mergeBase, taskId, worktreePath) + - fatal_error + - only throw for truly fatal cases (repo corruption, missing worktree, etc.) + + 4. On conflict, patch project context + emit event: + + - set task to conflict + - store conflict metadata in task.metadata + - emit merge_conflict_detected + + 5. Route conflict events to dedicated resolver personas in the pipeline: + + - Coder/QA conflict-resolver agent opens same worktreePath + - resolves conflict markers, runs checks + - emits merge_conflict_resolved + + 6. Retry merge after resolution event: + + - integration node attempts merge again + - if successful, emit branch_merged, mark merged, prune task worktree + - if still conflicting, loop with bounded retries + + 7. Add retry guardrails: + + - max conflict-resolution attempts per task + - on exhaustion emit merge_conflict_unresolved and stop cleanly (not crash the whole session) + + 8. Apply same pattern to session close (base -> project) so close can become: + + - conflict workflow or “closed_with_conflicts” state, rather than a hard failure. + + This keeps the app stable and lets agents handle conflicts as part of normal orchestration. \ No newline at end of file diff --git a/src/agents/domain-events.ts b/src/agents/domain-events.ts index 63290f3..9b815ae 100644 --- a/src/agents/domain-events.ts +++ b/src/agents/domain-events.ts @@ -2,9 +2,14 @@ import { randomUUID } from "node:crypto"; import type { JsonObject } from "./types.js"; export type PlanningDomainEventType = "requirements_defined" | "tasks_planned"; -export type ExecutionDomainEventType = "code_committed" | "task_blocked"; +export type ExecutionDomainEventType = "code_committed" | "task_blocked" | "task_ready_for_review"; export type ValidationDomainEventType = "validation_passed" | "validation_failed"; -export type IntegrationDomainEventType = "branch_merged"; +export type IntegrationDomainEventType = + | "branch_merged" + | "merge_conflict_detected" + | "merge_conflict_resolved" + | "merge_conflict_unresolved" + | "merge_retry_started"; export type DomainEventType = | PlanningDomainEventType @@ -46,9 +51,14 @@ const DOMAIN_EVENT_TYPES = new Set([ "tasks_planned", "code_committed", "task_blocked", + "task_ready_for_review", "validation_passed", "validation_failed", "branch_merged", + "merge_conflict_detected", + "merge_conflict_resolved", + "merge_conflict_unresolved", + "merge_retry_started", ]); export function isDomainEventType(value: string): value is DomainEventType { diff --git a/src/agents/lifecycle-observer.ts b/src/agents/lifecycle-observer.ts index 40b86af..67fa2a7 100644 --- a/src/agents/lifecycle-observer.ts +++ b/src/agents/lifecycle-observer.ts @@ -50,10 +50,14 @@ function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity } function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity { - if (type === "task_blocked") { + if (type === "task_blocked" || type === "merge_conflict_unresolved") { return "critical"; } - if (type === "validation_failed") { + if ( + type === "validation_failed" || + type === "merge_conflict_detected" || + type === "merge_retry_started" + ) { return "warning"; } return "info"; diff --git a/src/agents/orchestration.ts b/src/agents/orchestration.ts index 73a65d9..611670f 100644 --- a/src/agents/orchestration.ts +++ b/src/agents/orchestration.ts @@ -2,6 +2,7 @@ import { resolve } from "node:path"; import { getConfig, loadConfig, type AppConfig } from "../config.js"; import { createDefaultMcpRegistry, loadMcpConfigFromEnv, McpRegistry } from "../mcp.js"; import { parseAgentManifest, type AgentManifest } from "./manifest.js"; +import type { DomainEventEmission } from "./domain-events.js"; import { AgentManager } from "./manager.js"; import { PersonaRegistry, @@ -13,12 +14,19 @@ import { type ActorExecutionSecurityContext, type ActorExecutor, type PipelineRunSummary, + type TaskExecutionLifecycle, } from "./pipeline.js"; -import { FileSystemProjectContextStore } from "./project-context.js"; +import { + FileSystemProjectContextStore, + type ProjectTask, + type ProjectTaskStatus, +} from "./project-context.js"; import { FileSystemStateContextManager, type StoredSessionState } from "./state-context.js"; import type { JsonObject } from "./types.js"; +import { SessionWorktreeManager, type SessionMetadata } from "./session-lifecycle.js"; import { SecureCommandExecutor, + type SecurityViolationHandling, type SecurityAuditEvent, type SecurityAuditSink, SecurityRulesEngine, @@ -38,7 +46,8 @@ export type OrchestrationSettings = { maxDepth: number; maxRetries: number; maxChildren: number; - securityViolationHandling: "hard_abort" | "validation_fail"; + mergeConflictMaxAttempts: number; + securityViolationHandling: SecurityViolationHandling; runtimeContext: Record; }; @@ -56,6 +65,7 @@ export function loadOrchestrationSettingsFromEnv( maxDepth: config.orchestration.maxDepth, maxRetries: config.orchestration.maxRetries, maxChildren: config.orchestration.maxChildren, + mergeConflictMaxAttempts: config.orchestration.mergeConflictMaxAttempts, securityViolationHandling: config.security.violationHandling, }; } @@ -181,6 +191,9 @@ function createActorSecurityContext(input: { type: `security.${event.type}`, severity: mapSecurityAuditSeverity(event), message: toSecurityAuditMessage(event), + ...(event.sessionId ? { sessionId: event.sessionId } : {}), + ...(event.nodeId ? { nodeId: event.nodeId } : {}), + ...(typeof event.attempt === "number" ? { attempt: event.attempt } : {}), metadata: toSecurityAuditMetadata(event), }); }; @@ -199,6 +212,9 @@ function createActorSecurityContext(input: { blockedEnvAssignments: ["AGENT_STATE_ROOT", "AGENT_PROJECT_CONTEXT_PATH"], }, auditSink, + { + violationHandling: input.settings.securityViolationHandling, + }, ); return { @@ -221,6 +237,57 @@ function createActorSecurityContext(input: { }; } +function resolveSessionProjectContextPath(stateRoot: string, sessionId: string): string { + return resolve(stateRoot, sessionId, "project-context.json"); +} + +function readTaskIdFromPayload(payload: JsonObject, fallback: string): string { + const candidates = [payload.taskId, payload.task_id, payload.task]; + for (const candidate of candidates) { + if (typeof candidate === "string" && candidate.trim().length > 0) { + return candidate.trim(); + } + } + return fallback; +} + +function toTaskStatusForFailure( + resultStatus: "validation_fail" | "failure", + statusAtStart: string, +): ProjectTaskStatus { + if (resultStatus === "failure") { + return "failed"; + } + if (statusAtStart === "conflict" || statusAtStart === "resolving_conflict") { + return "conflict"; + } + return "in_progress"; +} + +function shouldMergeFromStatus(statusAtStart: string): boolean { + return statusAtStart === "review" || statusAtStart === "resolving_conflict"; +} + +function toTaskIdLabel(task: ProjectTask): string { + return task.taskId || task.id || "task"; +} + +function toJsonObject(value: unknown): JsonObject | undefined { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return undefined; + } + return value as JsonObject; +} + +function readMergeConflictAttempts(metadata: JsonObject | undefined): number { + const record = toJsonObject(metadata?.mergeConflict); + const attempts = record?.attempts; + if (typeof attempts === "number" && Number.isInteger(attempts) && attempts >= 0) { + return attempts; + } + return 0; +} + export class SchemaDrivenExecutionEngine { private readonly manifest: AgentManifest; private readonly personaRegistry = new PersonaRegistry(); @@ -234,6 +301,7 @@ export class SchemaDrivenExecutionEngine { private readonly mcpRegistry: McpRegistry; private readonly runtimeEventPublisher: RuntimeEventPublisher; private readonly securityContext: ActorExecutionSecurityContext; + private readonly sessionWorktreeManager: SessionWorktreeManager; constructor(input: { manifest: AgentManifest | unknown; @@ -260,6 +328,8 @@ export class SchemaDrivenExecutionEngine { maxDepth: input.settings?.maxDepth ?? config.orchestration.maxDepth, maxRetries: input.settings?.maxRetries ?? config.orchestration.maxRetries, maxChildren: input.settings?.maxChildren ?? config.orchestration.maxChildren, + mergeConflictMaxAttempts: + input.settings?.mergeConflictMaxAttempts ?? config.orchestration.mergeConflictMaxAttempts, securityViolationHandling: input.settings?.securityViolationHandling ?? config.security.violationHandling, runtimeContext: { @@ -273,6 +343,11 @@ export class SchemaDrivenExecutionEngine { this.projectContextStore = new FileSystemProjectContextStore({ filePath: this.settings.projectContextPath, }); + this.sessionWorktreeManager = new SessionWorktreeManager({ + worktreeRoot: resolve(this.settings.workspaceRoot, this.config.provisioning.gitWorktree.rootDirectory), + baseRef: this.config.provisioning.gitWorktree.baseRef, + targetPath: this.config.provisioning.gitWorktree.targetPath, + }); this.actorExecutors = toExecutorMap(input.actorExecutors); this.manager = @@ -352,9 +427,26 @@ export class SchemaDrivenExecutionEngine { initialPayload: JsonObject; initialState?: Partial; signal?: AbortSignal; + sessionMetadata?: SessionMetadata; }): Promise { const managerSessionId = `${input.sessionId}__pipeline`; const managerSession = this.manager.createSession(managerSessionId); + const workspaceRoot = input.sessionMetadata + ? this.sessionWorktreeManager.resolveWorkingDirectoryForWorktree( + input.sessionMetadata.baseWorkspacePath, + ) + : this.settings.workspaceRoot; + const projectContextStore = input.sessionMetadata + ? new FileSystemProjectContextStore({ + filePath: resolveSessionProjectContextPath(this.settings.stateRoot, input.sessionId), + }) + : this.projectContextStore; + const taskLifecycle = input.sessionMetadata + ? this.createTaskExecutionLifecycle({ + session: input.sessionMetadata, + projectContextStore, + }) + : undefined; const executor = new PipelineExecutor( this.manifest, @@ -362,25 +454,26 @@ export class SchemaDrivenExecutionEngine { this.stateManager, this.actorExecutors, { - workspaceRoot: this.settings.workspaceRoot, + workspaceRoot, runtimeContext: this.settings.runtimeContext, defaultModelConstraint: this.config.provider.claudeModel, resolvedExecutionSecurityConstraints: { dropUid: this.config.security.dropUid !== undefined, dropGid: this.config.security.dropGid !== undefined, - worktreePath: this.settings.workspaceRoot, + worktreePath: workspaceRoot, violationMode: this.settings.securityViolationHandling, }, maxDepth: Math.min(this.settings.maxDepth, this.manifest.topologyConstraints.maxDepth), maxRetries: Math.min(this.settings.maxRetries, this.manifest.topologyConstraints.maxRetries), manager: this.manager, managerSessionId, - projectContextStore: this.projectContextStore, - resolveMcpConfig: ({ providerHint, prompt, toolClearance }) => + projectContextStore, + resolveMcpConfig: ({ providerHint, prompt, toolClearance, workingDirectory }) => loadMcpConfigFromEnv( { providerHint, prompt, + ...(workingDirectory ? { workingDirectory } : {}), }, { config: this.config, @@ -391,6 +484,7 @@ export class SchemaDrivenExecutionEngine { securityViolationHandling: this.settings.securityViolationHandling, securityContext: this.securityContext, runtimeEventPublisher: this.runtimeEventPublisher, + ...(taskLifecycle ? { taskLifecycle } : {}), }, ); try { @@ -405,6 +499,335 @@ export class SchemaDrivenExecutionEngine { } } + private createTaskExecutionLifecycle(input: { + session: SessionMetadata; + projectContextStore: FileSystemProjectContextStore; + }): TaskExecutionLifecycle { + return { + prepareTaskExecution: async ({ node, context }) => { + const taskId = readTaskIdFromPayload(context.handoff.payload, node.id); + const projectContext = await input.projectContextStore.readState(); + const existing = projectContext.taskQueue.find( + (task) => toTaskIdLabel(task) === taskId, + ); + + const ensured = await this.sessionWorktreeManager.ensureTaskWorktree({ + sessionId: input.session.sessionId, + taskId, + baseWorkspacePath: input.session.baseWorkspacePath, + ...(existing?.worktreePath ? { existingWorktreePath: existing.worktreePath } : {}), + }); + + const statusAtStart: ProjectTaskStatus = + existing?.status === "review" || + existing?.status === "conflict" || + existing?.status === "resolving_conflict" + ? existing.status + : "in_progress"; + + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId, + id: taskId, + status: statusAtStart, + worktreePath: ensured.taskWorktreePath, + ...(existing?.title ? { title: existing.title } : { title: taskId }), + ...(existing?.metadata ? { metadata: existing.metadata } : {}), + }, + ], + }); + + return { + taskId, + workingDirectory: ensured.taskWorkingDirectory, + worktreePath: ensured.taskWorktreePath, + statusAtStart, + ...(existing?.metadata ? { metadata: existing.metadata } : {}), + }; + }, + finalizeTaskExecution: async ({ task, result, domainEvents }) => { + const emittedTypes = new Set(domainEvents.map((event) => event.type)); + const additionalEvents: DomainEventEmission[] = []; + const emitEvent = ( + type: DomainEventEmission["type"], + payload?: DomainEventEmission["payload"], + ): void => { + if (emittedTypes.has(type)) { + return; + } + emittedTypes.add(type); + additionalEvents.push(payload ? { type, payload } : { type }); + }; + + if (result.status === "failure" || result.status === "validation_fail") { + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: toTaskStatusForFailure(result.status, task.statusAtStart), + worktreePath: task.worktreePath, + title: task.taskId, + ...(task.metadata ? { metadata: task.metadata } : {}), + }, + ], + }); + return; + } + + if (task.statusAtStart === "conflict") { + const attempts = readMergeConflictAttempts(task.metadata); + const metadata: JsonObject = { + ...(task.metadata ?? {}), + mergeConflict: { + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: "resolved", + resolvedAt: new Date().toISOString(), + }, + }; + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "resolving_conflict", + worktreePath: task.worktreePath, + title: task.taskId, + metadata, + }, + ], + }); + + emitEvent("merge_conflict_resolved", { + summary: `Merge conflicts resolved for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: task.worktreePath, + attempts, + }, + }); + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + worktreePath: task.worktreePath, + mergeConflictStatus: "resolved", + mergeConflictAttempts: attempts, + } as JsonObject, + }; + } + + if (shouldMergeFromStatus(task.statusAtStart)) { + const attemptsBeforeMerge = readMergeConflictAttempts(task.metadata); + if (task.statusAtStart === "resolving_conflict") { + emitEvent("merge_retry_started", { + summary: `Retrying merge for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: task.worktreePath, + nextAttempt: attemptsBeforeMerge + 1, + maxAttempts: this.settings.mergeConflictMaxAttempts, + }, + }); + } + + const mergeOutcome = await this.sessionWorktreeManager.mergeTaskIntoBase({ + taskId: task.taskId, + baseWorkspacePath: input.session.baseWorkspacePath, + taskWorktreePath: task.worktreePath, + }); + + if (mergeOutcome.kind === "success") { + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "merged", + title: task.taskId, + metadata: { + ...(task.metadata ?? {}), + mergeConflict: { + attempts: attemptsBeforeMerge, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: "merged", + mergedAt: new Date().toISOString(), + }, + }, + }, + ], + }); + + emitEvent("branch_merged", { + summary: `Task "${task.taskId}" merged into session base branch.`, + details: { + taskId: task.taskId, + worktreePath: task.worktreePath, + }, + }); + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + mergeStatus: "merged", + } as JsonObject, + }; + } + + if (mergeOutcome.kind === "conflict") { + const attempts = attemptsBeforeMerge + 1; + const exhausted = attempts >= this.settings.mergeConflictMaxAttempts; + const metadata: JsonObject = { + ...(task.metadata ?? {}), + mergeConflict: { + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: exhausted ? "unresolved" : "conflict", + conflictFiles: mergeOutcome.conflictFiles, + worktreePath: mergeOutcome.worktreePath, + detectedAt: new Date().toISOString(), + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }; + + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "conflict", + worktreePath: task.worktreePath, + title: task.taskId, + metadata, + }, + ], + }); + + emitEvent("merge_conflict_detected", { + summary: `Merge conflict detected for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: mergeOutcome.worktreePath, + conflictFiles: mergeOutcome.conflictFiles, + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }); + + if (exhausted) { + emitEvent("merge_conflict_unresolved", { + summary: + `Merge conflict attempts exhausted for task "${task.taskId}" ` + + `(${String(attempts)}/${String(this.settings.mergeConflictMaxAttempts)}).`, + details: { + taskId: task.taskId, + worktreePath: mergeOutcome.worktreePath, + conflictFiles: mergeOutcome.conflictFiles, + attempts, + maxAttempts: this.settings.mergeConflictMaxAttempts, + }, + }); + } + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + worktreePath: task.worktreePath, + mergeConflictStatus: exhausted ? "unresolved" : "conflict", + mergeConflictAttempts: attempts, + mergeConflictMaxAttempts: this.settings.mergeConflictMaxAttempts, + mergeConflictFiles: mergeOutcome.conflictFiles, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + } as JsonObject, + }; + } + + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "failed", + worktreePath: task.worktreePath, + title: task.taskId, + metadata: { + ...(task.metadata ?? {}), + mergeConflict: { + attempts: attemptsBeforeMerge, + maxAttempts: this.settings.mergeConflictMaxAttempts, + status: "fatal_error", + error: mergeOutcome.error, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }, + }, + ], + }); + + emitEvent("merge_conflict_unresolved", { + summary: `Fatal merge error for task "${task.taskId}".`, + details: { + taskId: task.taskId, + worktreePath: mergeOutcome.worktreePath, + error: mergeOutcome.error, + ...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}), + }, + }); + emitEvent("task_blocked", { + summary: `Task "${task.taskId}" blocked due to fatal merge error.`, + details: { + taskId: task.taskId, + error: mergeOutcome.error, + }, + }); + + return { + additionalEvents, + handoffPayloadPatch: { + taskId: task.taskId, + worktreePath: task.worktreePath, + mergeStatus: "fatal_error", + mergeError: mergeOutcome.error, + } as JsonObject, + }; + } + + const nextMetadata = task.metadata + ? { + ...task.metadata, + } + : undefined; + + await input.projectContextStore.patchState({ + upsertTasks: [ + { + taskId: task.taskId, + id: task.taskId, + status: "review", + worktreePath: task.worktreePath, + title: task.taskId, + ...(nextMetadata ? { metadata: nextMetadata } : {}), + }, + ], + }); + + if (additionalEvents.length > 0) { + return { + additionalEvents, + }; + } + + return; + }, + }; + } + private assertRelationshipConstraints(): void { for (const [parent, edges] of this.childrenByParent.entries()) { if (edges.length > this.settings.maxChildren) { diff --git a/src/agents/pipeline.ts b/src/agents/pipeline.ts index 21390ff..8a497a2 100644 --- a/src/agents/pipeline.ts +++ b/src/agents/pipeline.ts @@ -63,6 +63,7 @@ export type ActorExecutionResult = { export type ActorToolPermissionResult = | { behavior: "allow"; + updatedInput?: Record; toolUseID?: string; } | { @@ -107,6 +108,8 @@ export type ResolvedExecutionContext = { export type ActorExecutionInput = { sessionId: string; + attempt: number; + depth: number; node: PipelineNode; prompt: string; context: NodeExecutionContext; @@ -153,6 +156,7 @@ export type PipelineExecutorOptions = { securityViolationHandling?: SecurityViolationHandling; securityContext?: ActorExecutionSecurityContext; runtimeEventPublisher?: RuntimeEventPublisher; + taskLifecycle?: TaskExecutionLifecycle; }; export type ActorExecutionSecurityContext = { @@ -166,6 +170,35 @@ export type ActorExecutionSecurityContext = { }) => SecureCommandExecutor; }; +export type TaskExecutionResolution = { + taskId: string; + workingDirectory: string; + worktreePath: string; + statusAtStart: string; + metadata?: JsonObject; +}; + +export type TaskExecutionLifecycle = { + prepareTaskExecution: (input: { + sessionId: string; + node: PipelineNode; + context: NodeExecutionContext; + }) => Promise; + finalizeTaskExecution: (input: { + sessionId: string; + node: PipelineNode; + task: TaskExecutionResolution; + result: ActorExecutionResult; + domainEvents: DomainEvent[]; + }) => Promise< + | void + | { + additionalEvents?: DomainEventEmission[]; + handoffPayloadPatch?: JsonObject; + } + >; +}; + type QueueItem = { nodeId: string; depth: number; @@ -612,9 +645,11 @@ export class PipelineExecutor { globalFlags: { ...projectContext.globalFlags }, artifactPointers: { ...projectContext.artifactPointers }, taskQueue: projectContext.taskQueue.map((task) => ({ - id: task.id, - title: task.title, + taskId: task.taskId, + id: task.id ?? task.taskId, + ...(task.title ? { title: task.title } : {}), status: task.status, + ...(task.worktreePath ? { worktreePath: task.worktreePath } : {}), ...(task.assignee ? { assignee: task.assignee } : {}), ...(task.metadata ? { metadata: task.metadata } : {}), })), @@ -886,6 +921,13 @@ export class PipelineExecutor { })(); const context = await this.stateManager.buildFreshNodeContext(sessionId, node.id); + const taskResolution = this.options.taskLifecycle + ? await this.options.taskLifecycle.prepareTaskExecution({ + sessionId, + node, + context, + }) + : undefined; const prompt = this.personaRegistry.renderSystemPrompt({ personaId: node.personaId, runtimeContext: { @@ -901,10 +943,13 @@ export class PipelineExecutor { node, toolClearance, prompt, + worktreePathOverride: taskResolution?.workingDirectory, }); const result = await this.invokeActorExecutor({ sessionId, + attempt, + depth: recursiveDepth, node, prompt, context, @@ -921,12 +966,50 @@ export class PipelineExecutor { customEvents: result.events, }); const topologyKind: NodeTopologyKind = node.topology?.kind ?? "sequential"; - const payloadForNext = result.payload ?? context.handoff.payload; + let payloadForNext: JsonObject = { + ...context.handoff.payload, + ...(result.payload ?? {}), + ...(taskResolution + ? { + taskId: taskResolution.taskId, + workingDirectory: taskResolution.workingDirectory, + worktreePath: taskResolution.worktreePath, + } + : {}), + }; const shouldRetry = result.status === "validation_fail" && this.shouldRetryValidation(node) && attempt <= maxRetriesForNode; + if (taskResolution && this.options.taskLifecycle) { + const finalization = await this.options.taskLifecycle.finalizeTaskExecution({ + sessionId, + node, + task: taskResolution, + result, + domainEvents, + }); + for (const eventEmission of finalization?.additionalEvents ?? []) { + domainEvents.push( + createDomainEvent({ + type: eventEmission.type, + source: "pipeline", + sessionId, + nodeId: node.id, + attempt, + payload: eventEmission.payload, + }), + ); + } + if (finalization?.handoffPayloadPatch) { + payloadForNext = { + ...payloadForNext, + ...finalization.handoffPayloadPatch, + }; + } + } + await this.lifecycleObserver.onNodeAttempt({ sessionId, node, @@ -1021,6 +1104,8 @@ export class PipelineExecutor { private async invokeActorExecutor(input: { sessionId: string; + attempt: number; + depth: number; node: PipelineNode; prompt: string; context: NodeExecutionContext; @@ -1033,12 +1118,20 @@ export class PipelineExecutor { return await input.executor({ sessionId: input.sessionId, + attempt: input.attempt, + depth: input.depth, node: input.node, prompt: input.prompt, context: input.context, signal: input.signal, executionContext: input.executionContext, - mcp: this.buildActorMcpContext(input.executionContext, input.prompt), + mcp: this.buildActorMcpContext({ + sessionId: input.sessionId, + nodeId: input.node.id, + attempt: input.attempt, + executionContext: input.executionContext, + prompt: input.prompt, + }), security: this.securityContext, }); } catch (error) { @@ -1079,9 +1172,15 @@ export class PipelineExecutor { node: PipelineNode; toolClearance: ToolClearancePolicy; prompt: string; + worktreePathOverride?: string; }): ResolvedExecutionContext { const normalizedToolClearance = parseToolClearancePolicy(input.toolClearance); - const toolUniverse = this.resolveAvailableToolsForAttempt(normalizedToolClearance, input.prompt); + const worktreePath = input.worktreePathOverride ?? this.options.resolvedExecutionSecurityConstraints.worktreePath; + const toolUniverse = this.resolveAvailableToolsForAttempt({ + toolClearance: normalizedToolClearance, + prompt: input.prompt, + worktreePath, + }); const allowedTools = this.resolveAllowedToolsForAttempt({ toolClearance: normalizedToolClearance, toolUniverse, @@ -1097,6 +1196,7 @@ export class PipelineExecutor { allowedTools, security: { ...this.options.resolvedExecutionSecurityConstraints, + worktreePath, }, }; } @@ -1119,15 +1219,20 @@ export class PipelineExecutor { return []; } - private resolveAvailableToolsForAttempt(toolClearance: ToolClearancePolicy, prompt: string): string[] { + private resolveAvailableToolsForAttempt(input: { + toolClearance: ToolClearancePolicy; + prompt: string; + worktreePath: string; + }): string[] { if (!this.options.resolveMcpConfig) { return []; } const resolved = this.options.resolveMcpConfig({ providerHint: "codex", - prompt, - toolClearance, + prompt: input.prompt, + workingDirectory: input.worktreePath, + toolClearance: input.toolClearance, }); const rawServers = resolved.codexConfig?.mcp_servers; @@ -1147,10 +1252,14 @@ export class PipelineExecutor { return dedupeStrings(tools); } - private buildActorMcpContext( - executionContext: ResolvedExecutionContext, - prompt: string, - ): ActorExecutionMcpContext { + private buildActorMcpContext(input: { + sessionId: string; + nodeId: string; + attempt: number; + executionContext: ResolvedExecutionContext; + prompt: string; + }): ActorExecutionMcpContext { + const { executionContext, prompt } = input; const toolPolicy = toAllowedToolPolicy(executionContext.allowedTools); const filterToolsForProvider = (tools: string[]): string[] => { const deduped = dedupeStrings(tools); @@ -1161,6 +1270,7 @@ export class PipelineExecutor { ? this.options.resolveMcpConfig({ providerHint: "both", prompt, + workingDirectory: executionContext.security.worktreePath, toolClearance: toolPolicy, }) : {}; @@ -1169,7 +1279,12 @@ export class PipelineExecutor { executionContext.allowedTools, ); const resolveConfig = (context: McpLoadContext = {}): LoadedMcpConfig => { - if (context.providerHint === "codex") { + const withWorkingDirectory: McpLoadContext = { + ...context, + ...(context.workingDirectory ? {} : { workingDirectory: executionContext.security.worktreePath }), + }; + + if (withWorkingDirectory.providerHint === "codex") { return { ...(resolvedConfig.codexConfig ? { codexConfig: cloneMcpConfig(resolvedConfig).codexConfig } : {}), ...(resolvedConfig.sourcePath ? { sourcePath: resolvedConfig.sourcePath } : {}), @@ -1179,7 +1294,7 @@ export class PipelineExecutor { }; } - if (context.providerHint === "claude") { + if (withWorkingDirectory.providerHint === "claude") { return { ...(resolvedConfig.claudeMcpServers ? { claudeMcpServers: cloneMcpConfig(resolvedConfig).claudeMcpServers } @@ -1195,7 +1310,13 @@ export class PipelineExecutor { }; const createToolPermissionHandler = (): ActorToolPermissionHandler => - this.createToolPermissionHandler(executionContext.allowedTools); + this.createToolPermissionHandler({ + allowedTools: executionContext.allowedTools, + violationMode: executionContext.security.violationMode, + sessionId: input.sessionId, + nodeId: input.nodeId, + attempt: input.attempt, + }); return { allowedTools: [...executionContext.allowedTools], @@ -1207,13 +1328,24 @@ export class PipelineExecutor { }; } - private createToolPermissionHandler(allowedTools: readonly string[]): ActorToolPermissionHandler { - const allowset = new Set(allowedTools); - const caseInsensitiveAllowLookup = buildCaseInsensitiveToolLookup(allowedTools); + private createToolPermissionHandler(input: { + allowedTools: readonly string[]; + violationMode: SecurityViolationHandling; + sessionId: string; + nodeId: string; + attempt: number; + }): ActorToolPermissionHandler { + const allowset = new Set(input.allowedTools); + const caseInsensitiveAllowLookup = buildCaseInsensitiveToolLookup(input.allowedTools); const rulesEngine = this.securityContext?.rulesEngine; - const toolPolicy = toAllowedToolPolicy(allowedTools); + const toolPolicy = toAllowedToolPolicy(input.allowedTools); + const toolAuditContext = { + sessionId: input.sessionId, + nodeId: input.nodeId, + attempt: input.attempt, + }; - return async (toolName, _input, options) => { + return async (toolName, toolInput, options) => { const toolUseID = options.toolUseID; if (options.signal.aborted) { return { @@ -1231,10 +1363,28 @@ export class PipelineExecutor { caseInsensitiveLookup: caseInsensitiveAllowLookup, }); if (!allowMatch) { - rulesEngine?.assertToolInvocationAllowed({ - tool: candidates[0] ?? toolName, - toolClearance: toolPolicy, - }); + if (rulesEngine) { + try { + rulesEngine.assertToolInvocationAllowed({ + tool: candidates[0] ?? toolName, + toolClearance: toolPolicy, + context: toolAuditContext, + }); + } catch (error) { + if ( + !(input.violationMode === "dangerous_warn_only" && error instanceof SecurityViolationError) + ) { + throw error; + } + } + } + if (input.violationMode === "dangerous_warn_only") { + return { + behavior: "allow", + updatedInput: toolInput, + ...(toolUseID ? { toolUseID } : {}), + }; + } return { behavior: "deny", message: `Tool "${toolName}" is not in the resolved execution allowlist.`, @@ -1246,10 +1396,12 @@ export class PipelineExecutor { rulesEngine?.assertToolInvocationAllowed({ tool: allowMatch, toolClearance: toolPolicy, + context: toolAuditContext, }); return { behavior: "allow", + updatedInput: toolInput, ...(toolUseID ? { toolUseID } : {}), }; }; diff --git a/src/agents/project-context.ts b/src/agents/project-context.ts index 45d575a..b907a03 100644 --- a/src/agents/project-context.ts +++ b/src/agents/project-context.ts @@ -5,12 +5,23 @@ import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./type export const PROJECT_CONTEXT_SCHEMA_VERSION = 1; -export type ProjectTaskStatus = "pending" | "in_progress" | "blocked" | "done"; +export type ProjectTaskStatus = + | "pending" + | "in_progress" + | "review" + | "conflict" + | "resolving_conflict" + | "merged" + | "failed" + | "blocked" + | "done"; export type ProjectTask = { - id: string; - title: string; + taskId: string; + id?: string; + title?: string; status: ProjectTaskStatus; + worktreePath?: string; assignee?: string; metadata?: JsonObject; }; @@ -52,7 +63,17 @@ function toJsonObject(value: unknown, label: string): JsonObject { } function toTaskStatus(value: unknown, label: string): ProjectTaskStatus { - if (value === "pending" || value === "in_progress" || value === "blocked" || value === "done") { + if ( + value === "pending" || + value === "in_progress" || + value === "review" || + value === "conflict" || + value === "resolving_conflict" || + value === "merged" || + value === "failed" || + value === "blocked" || + value === "done" + ) { return value; } throw new Error(`${label} has unsupported status "${String(value)}".`); @@ -68,10 +89,28 @@ function toProjectTask(value: unknown, label: string): ProjectTask { throw new Error(`${label}.assignee must be a non-empty string when provided.`); } + const taskIdCandidate = value.taskId ?? value.id; + const taskId = assertNonEmptyString(taskIdCandidate, `${label}.taskId`); + + const titleRaw = value.title; + if (titleRaw !== undefined && (typeof titleRaw !== "string" || titleRaw.trim().length === 0)) { + throw new Error(`${label}.title must be a non-empty string when provided.`); + } + + const worktreePathRaw = value.worktreePath; + if ( + worktreePathRaw !== undefined && + (typeof worktreePathRaw !== "string" || worktreePathRaw.trim().length === 0) + ) { + throw new Error(`${label}.worktreePath must be a non-empty string when provided.`); + } + return { - id: assertNonEmptyString(value.id, `${label}.id`), - title: assertNonEmptyString(value.title, `${label}.title`), + taskId, + id: taskId, + ...(typeof titleRaw === "string" ? { title: titleRaw.trim() } : {}), status: toTaskStatus(value.status, `${label}.status`), + ...(typeof worktreePathRaw === "string" ? { worktreePath: worktreePathRaw.trim() } : {}), ...(typeof assignee === "string" ? { assignee: assignee.trim() } : {}), ...(value.metadata !== undefined ? { metadata: toJsonObject(value.metadata, `${label}.metadata`) } @@ -157,10 +196,10 @@ function mergeUpsertTasks(current: ProjectTask[], upserts: ProjectTask[]): Proje const byId = new Map(); for (const task of current) { - byId.set(task.id, task); + byId.set(task.taskId, task); } for (const task of upserts) { - byId.set(task.id, task); + byId.set(task.taskId, task); } return [...byId.values()]; diff --git a/src/agents/provider-executor.ts b/src/agents/provider-executor.ts index 7828ffa..64ebe96 100644 --- a/src/agents/provider-executor.ts +++ b/src/agents/provider-executor.ts @@ -9,14 +9,16 @@ import { import { isDomainEventType, type DomainEventEmission } from "../agents/domain-events.js"; import type { ActorExecutionInput, ActorExecutionResult, ActorExecutor } from "../agents/pipeline.js"; import { isRecord, type JsonObject, type JsonValue } from "../agents/types.js"; -import { createSessionContext, type SessionContext } from "../examples/session-context.js"; +import { ClaudeObservabilityLogger } from "../ui/claude-observability.js"; +import { z } from "zod"; export type RunProvider = "codex" | "claude"; export type ProviderRunRuntime = { provider: RunProvider; config: Readonly; - sessionContext: SessionContext; + sharedEnv: Record; + claudeObservability: ClaudeObservabilityLogger; close: () => Promise; }; @@ -28,6 +30,16 @@ type ProviderUsage = { costUsd?: number; }; +function sanitizeEnv(input: Record): Record { + const output: Record = {}; + for (const [key, value] of Object.entries(input)) { + if (typeof value === "string") { + output[key] = value; + } + } + return output; +} + const ACTOR_RESPONSE_SCHEMA = { type: "object", additionalProperties: true, @@ -72,10 +84,6 @@ const CLAUDE_OUTPUT_FORMAT = { schema: ACTOR_RESPONSE_SCHEMA, } as const; -const CLAUDE_PROVIDER_MAX_TURNS = 2; - -import { z } from "zod"; - const ActorResponseSchema = z.object({ status: z.enum(["success", "validation_fail", "failure"]), payload: z.unknown().optional(), @@ -85,7 +93,6 @@ const ActorResponseSchema = z.object({ failureKind: z.unknown().optional(), failureCode: z.unknown().optional(), }); - function toErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; @@ -93,6 +100,23 @@ function toErrorMessage(error: unknown): string { return String(error); } +export function resolveProviderWorkingDirectory(actorInput: ActorExecutionInput): string { + return actorInput.executionContext.security.worktreePath; +} + +export function buildProviderRuntimeEnv(input: { + runtime: ProviderRunRuntime; + actorInput: ActorExecutionInput; + includeClaudeAuth?: boolean; +}): Record { + const workingDirectory = resolveProviderWorkingDirectory(input.actorInput); + return sanitizeEnv({ + ...input.runtime.sharedEnv, + ...(input.includeClaudeAuth ? buildClaudeAuthEnv(input.runtime.config.provider) : {}), + AGENT_WORKTREE_PATH: workingDirectory, + }); +} + function toJsonValue(value: unknown): JsonValue { return JSON.parse(JSON.stringify(value)) as JsonValue; } @@ -338,7 +362,7 @@ function buildActorPrompt(input: ActorExecutionInput): string { }, events: [ { - type: "requirements_defined | tasks_planned | code_committed | task_blocked | validation_passed | validation_failed | branch_merged", + type: "requirements_defined | tasks_planned | code_committed | task_ready_for_review | task_blocked | validation_passed | validation_failed | branch_merged | merge_conflict_detected | merge_conflict_resolved | merge_conflict_unresolved | merge_retry_started", payload: { summary: "optional", details: {}, @@ -370,6 +394,7 @@ async function runCodexActor(input: { const prompt = buildActorPrompt(actorInput); const startedAt = Date.now(); const apiKey = resolveOpenAiApiKey(runtime.config.provider); + const workingDirectory = resolveProviderWorkingDirectory(actorInput); const codex = new Codex({ ...(apiKey ? { apiKey } : {}), @@ -379,20 +404,21 @@ async function runCodexActor(input: { ...(actorInput.mcp.resolvedConfig.codexConfig ? { config: actorInput.mcp.resolvedConfig.codexConfig } : {}), - env: runtime.sessionContext.runtimeInjection.env, + env: buildProviderRuntimeEnv({ + runtime, + actorInput, + }), }); const thread = codex.startThread({ - workingDirectory: runtime.sessionContext.runtimeInjection.workingDirectory, + workingDirectory, skipGitRepoCheck: runtime.config.provider.codexSkipGitCheck, }); - const turn = await runtime.sessionContext.runInSession(() => - thread.run(prompt, { - signal: actorInput.signal, - outputSchema: ACTOR_RESPONSE_SCHEMA, - }), - ); + const turn = await thread.run(prompt, { + signal: actorInput.signal, + outputSchema: ACTOR_RESPONSE_SCHEMA, + }); const usage: ProviderUsage = { ...(turn.usage @@ -421,11 +447,46 @@ type ClaudeTurnResult = { usage: ProviderUsage; }; +function toClaudeTraceContext(actorInput: ActorExecutionInput): { + sessionId: string; + nodeId: string; + attempt: number; + depth: number; +} { + return { + sessionId: actorInput.sessionId, + nodeId: actorInput.node.id, + attempt: actorInput.attempt, + depth: actorInput.depth, + }; +} + +function toProviderUsageJson(usage: ProviderUsage): JsonObject { + const output: JsonObject = {}; + if (typeof usage.tokenInput === "number") { + output.tokenInput = usage.tokenInput; + } + if (typeof usage.tokenOutput === "number") { + output.tokenOutput = usage.tokenOutput; + } + if (typeof usage.tokenTotal === "number") { + output.tokenTotal = usage.tokenTotal; + } + if (typeof usage.durationMs === "number") { + output.durationMs = usage.durationMs; + } + if (typeof usage.costUsd === "number") { + output.costUsd = usage.costUsd; + } + return output; +} + function buildClaudeOptions(input: { runtime: ProviderRunRuntime; actorInput: ActorExecutionInput; }): Options { const { runtime, actorInput } = input; + const workingDirectory = resolveProviderWorkingDirectory(actorInput); const authOptionOverrides = runtime.config.provider.anthropicOauthToken ? { authToken: runtime.config.provider.anthropicOauthToken } @@ -434,13 +495,15 @@ function buildClaudeOptions(input: { return token ? { apiKey: token } : {}; })(); - const runtimeEnv = { - ...runtime.sessionContext.runtimeInjection.env, - ...buildClaudeAuthEnv(runtime.config.provider), - }; + const runtimeEnv = buildProviderRuntimeEnv({ + runtime, + actorInput, + includeClaudeAuth: true, + }); + const traceContext = toClaudeTraceContext(actorInput); return { - maxTurns: CLAUDE_PROVIDER_MAX_TURNS, + maxTurns: runtime.config.provider.claudeMaxTurns, ...(runtime.config.provider.claudeModel ? { model: runtime.config.provider.claudeModel } : {}), @@ -452,8 +515,11 @@ function buildClaudeOptions(input: { ? { mcpServers: actorInput.mcp.resolvedConfig.claudeMcpServers as Options["mcpServers"] } : {}), canUseTool: actorInput.mcp.createClaudeCanUseTool(), - cwd: runtime.sessionContext.runtimeInjection.workingDirectory, + cwd: workingDirectory, env: runtimeEnv, + ...runtime.claudeObservability.toOptionOverrides({ + context: traceContext, + }), outputFormat: CLAUDE_OUTPUT_FORMAT, }; } @@ -463,10 +529,19 @@ async function runClaudeTurn(input: { actorInput: ActorExecutionInput; prompt: string; }): Promise { + const traceContext = toClaudeTraceContext(input.actorInput); const options = buildClaudeOptions({ runtime: input.runtime, actorInput: input.actorInput, }); + input.runtime.claudeObservability.recordQueryStarted({ + context: traceContext, + data: { + ...(options.model ? { model: options.model } : {}), + maxTurns: options.maxTurns ?? input.runtime.config.provider.claudeMaxTurns, + ...(typeof options.cwd === "string" ? { cwd: options.cwd } : {}), + }, + }); const startedAt = Date.now(); const stream = query({ @@ -477,6 +552,7 @@ async function runClaudeTurn(input: { let resultText = ""; let structuredOutput: unknown; let usage: ProviderUsage = {}; + let messageCount = 0; const onAbort = (): void => { stream.close(); @@ -486,6 +562,12 @@ async function runClaudeTurn(input: { try { for await (const message of stream as AsyncIterable) { + messageCount += 1; + input.runtime.claudeObservability.recordMessage({ + context: traceContext, + message, + }); + if (message.type !== "result") { continue; } @@ -507,6 +589,12 @@ async function runClaudeTurn(input: { costUsd: message.total_cost_usd, }; } + } catch (error) { + input.runtime.claudeObservability.recordQueryError({ + context: traceContext, + error, + }); + throw error; } finally { input.actorInput.signal.removeEventListener("abort", onAbort); stream.close(); @@ -517,9 +605,22 @@ async function runClaudeTurn(input: { } if (!resultText) { - throw new Error("Claude run completed without a final result."); + const error = new Error("Claude run completed without a final result."); + input.runtime.claudeObservability.recordQueryError({ + context: traceContext, + error, + }); + throw error; } + input.runtime.claudeObservability.recordQueryCompleted({ + context: traceContext, + data: { + messageCount, + usage: toProviderUsageJson(usage), + }, + }); + return { text: resultText, structuredOutput, @@ -535,13 +636,11 @@ async function runClaudeActor(input: { actorInput: ActorExecutionInput; }): Promise { const prompt = buildActorPrompt(input.actorInput); - const turn = await input.runtime.sessionContext.runInSession(() => - runClaudeTurn({ - runtime: input.runtime, - actorInput: input.actorInput, - prompt, - }), - ); + const turn = await runClaudeTurn({ + runtime: input.runtime, + actorInput: input.actorInput, + prompt, + }); const parsed = parseActorExecutionResultFromModelOutput({ rawText: turn.text, @@ -556,21 +655,21 @@ async function runClaudeActor(input: { export async function createProviderRunRuntime(input: { provider: RunProvider; - initialPrompt: string; config: Readonly; + observabilityRootPath?: string; + baseEnv?: Record; }): Promise { - const sessionContext = await createSessionContext(input.provider, { - prompt: input.initialPrompt, - config: input.config, + const claudeObservability = new ClaudeObservabilityLogger({ + workspaceRoot: input.observabilityRootPath ?? process.cwd(), + config: input.config.provider.claudeObservability, }); return { provider: input.provider, config: input.config, - sessionContext, - close: async () => { - await sessionContext.close(); - }, + sharedEnv: sanitizeEnv(input.baseEnv ?? process.env), + claudeObservability, + close: async () => claudeObservability.close(), }; } diff --git a/src/agents/provisioning.ts b/src/agents/provisioning.ts index 1569e21..132b0c8 100644 --- a/src/agents/provisioning.ts +++ b/src/agents/provisioning.ts @@ -197,9 +197,9 @@ export class ResourceProvisioningOrchestrator { async provisionSession(input: { sessionId: string; resources: ResourceRequest[]; - workspaceRoot?: string; + workspaceRoot: string; }): Promise { - const workspaceRoot = resolve(input.workspaceRoot ?? process.cwd()); + const workspaceRoot = resolve(input.workspaceRoot); const hardConstraints: ProvisionedResourcesState["hardConstraints"] = []; const releases: ProvisionedResourcesState["releases"] = []; const env: Record = {}; diff --git a/src/agents/session-lifecycle.ts b/src/agents/session-lifecycle.ts new file mode 100644 index 0000000..33364fe --- /dev/null +++ b/src/agents/session-lifecycle.ts @@ -0,0 +1,872 @@ +import { execFile } from "node:child_process"; +import { randomUUID } from "node:crypto"; +import { mkdir, readFile, readdir, stat } from "node:fs/promises"; +import { dirname, isAbsolute, resolve } from "node:path"; +import { promisify } from "node:util"; +import { withFileLock, writeUtf8FileAtomic } from "./file-persistence.js"; + +const execFileAsync = promisify(execFile); + +const SESSION_METADATA_FILE_NAME = "session-metadata.json"; + +export type SessionStatus = "active" | "suspended" | "closed" | "closed_with_conflicts"; + +export type SessionMetadata = { + sessionId: string; + projectPath: string; + sessionStatus: SessionStatus; + baseWorkspacePath: string; + createdAt: string; + updatedAt: string; +}; + +export type CreateSessionRequest = { + projectPath: string; +}; + +export type MergeTaskIntoBaseOutcome = + | { + kind: "success"; + taskId: string; + worktreePath: string; + baseWorkspacePath: string; + } + | { + kind: "conflict"; + taskId: string; + worktreePath: string; + baseWorkspacePath: string; + conflictFiles: string[]; + mergeBase?: string; + } + | { + kind: "fatal_error"; + taskId: string; + worktreePath: string; + baseWorkspacePath: string; + error: string; + mergeBase?: string; + }; + +export type CloseSessionOutcome = + | { + kind: "success"; + sessionId: string; + mergedToProject: boolean; + } + | { + kind: "conflict"; + sessionId: string; + worktreePath: string; + conflictFiles: string[]; + mergeBase?: string; + baseBranch?: string; + } + | { + kind: "fatal_error"; + sessionId: string; + error: string; + baseBranch?: string; + mergeBase?: string; + }; + +type GitExecutionResult = { + exitCode: number; + stdout: string; + stderr: string; +}; + +type GitWorktreeRecord = { + path: string; + branchRef?: string; +}; + +function toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + return String(error); +} + +function assertAbsolutePath(path: string, label: string): string { + if (!isAbsolute(path)) { + throw new Error(`${label} must be an absolute path.`); + } + return resolve(path); +} + +function normalizeWorktreePath(path: string): string { + const normalized = resolve(path); + return normalized.startsWith("/private/var/") ? normalized.slice("/private".length) : normalized; +} + +function assertNonEmptyString(value: unknown, label: string): string { + if (typeof value !== "string" || value.trim().length === 0) { + throw new Error(`${label} must be a non-empty string.`); + } + return value.trim(); +} + +function toSessionStatus(value: unknown): SessionStatus { + if ( + value === "active" || + value === "suspended" || + value === "closed" || + value === "closed_with_conflicts" + ) { + return value; + } + throw new Error(`Session status "${String(value)}" is not supported.`); +} + +function toSessionMetadata(value: unknown): SessionMetadata { + if (!value || typeof value !== "object" || Array.isArray(value)) { + throw new Error("Session metadata file is malformed."); + } + + const raw = value as Record; + + return { + sessionId: assertNonEmptyString(raw.sessionId, "sessionId"), + projectPath: assertAbsolutePath(assertNonEmptyString(raw.projectPath, "projectPath"), "projectPath"), + baseWorkspacePath: assertAbsolutePath( + assertNonEmptyString(raw.baseWorkspacePath, "baseWorkspacePath"), + "baseWorkspacePath", + ), + sessionStatus: toSessionStatus(raw.sessionStatus), + createdAt: assertNonEmptyString(raw.createdAt, "createdAt"), + updatedAt: assertNonEmptyString(raw.updatedAt, "updatedAt"), + }; +} + +async function runGit(args: string[]): Promise { + const result = await runGitWithResult(args); + if (result.exitCode !== 0) { + throw new Error(`git ${args.join(" ")} failed: ${result.stderr || result.stdout || "unknown git error"}`); + } + return result.stdout.trim(); +} + +async function runGitWithResult(args: string[]): Promise { + try { + const { stdout, stderr } = await execFileAsync("git", args, { + encoding: "utf8", + }); + return { + exitCode: 0, + stdout: stdout.trim(), + stderr: stderr.trim(), + }; + } catch (error) { + const failure = error as { + code?: number | string; + stdout?: string; + stderr?: string; + }; + if (typeof failure.code === "number") { + return { + exitCode: failure.code, + stdout: String(failure.stdout ?? "").trim(), + stderr: String(failure.stderr ?? "").trim(), + }; + } + throw new Error(`git ${args.join(" ")} failed: ${toErrorMessage(error)}`); + } +} + +async function pathExists(path: string): Promise { + try { + await stat(path); + return true; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return false; + } + throw error; + } +} + +function sanitizeSegment(value: string, fallback: string): string { + const normalized = value + .trim() + .replace(/[^a-zA-Z0-9_-]/g, "-") + .replace(/-+/g, "-") + .replace(/^-+/, "") + .replace(/-+$/, ""); + return normalized || fallback; +} + +function toGitFailureMessage(result: GitExecutionResult): string { + const details = result.stderr || result.stdout || "unknown git error"; + return `git command failed with exit code ${String(result.exitCode)}: ${details}`; +} + +function toStringLines(value: string): string[] { + return value + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0); +} + +function parseGitWorktreeRecords(value: string): GitWorktreeRecord[] { + const lines = value.split("\n"); + const records: GitWorktreeRecord[] = []; + let current: GitWorktreeRecord | undefined; + + for (const line of lines) { + if (!line.trim()) { + if (current) { + records.push(current); + current = undefined; + } + continue; + } + if (line.startsWith("worktree ")) { + if (current) { + records.push(current); + } + current = { + path: line.slice("worktree ".length).trim(), + }; + continue; + } + if (line.startsWith("branch ") && current) { + current.branchRef = line.slice("branch ".length).trim(); + } + } + + if (current) { + records.push(current); + } + + return records; +} + +export class FileSystemSessionMetadataStore { + private readonly stateRoot: string; + + constructor(input: { stateRoot: string }) { + this.stateRoot = resolve(input.stateRoot); + } + + getStateRoot(): string { + return this.stateRoot; + } + + getSessionDirectory(sessionId: string): string { + return resolve(this.stateRoot, sessionId); + } + + getSessionMetadataPath(sessionId: string): string { + return resolve(this.getSessionDirectory(sessionId), SESSION_METADATA_FILE_NAME); + } + + getSessionProjectContextPath(sessionId: string): string { + return resolve(this.getSessionDirectory(sessionId), "project-context.json"); + } + + async createSession(input: { + projectPath: string; + baseWorkspacePath: string; + sessionId?: string; + }): Promise { + const sessionId = input.sessionId?.trim() || randomUUID(); + const now = new Date().toISOString(); + const metadata: SessionMetadata = { + sessionId, + projectPath: assertAbsolutePath(input.projectPath, "projectPath"), + baseWorkspacePath: assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath"), + sessionStatus: "active", + createdAt: now, + updatedAt: now, + }; + + const sessionDirectory = this.getSessionDirectory(sessionId); + await mkdir(sessionDirectory, { recursive: true }); + await this.writeSessionMetadata(metadata); + + return metadata; + } + + async readSession(sessionId: string): Promise { + const metadataPath = this.getSessionMetadataPath(sessionId); + + try { + const content = await readFile(metadataPath, "utf8"); + return toSessionMetadata(JSON.parse(content) as unknown); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return undefined; + } + throw error; + } + } + + async listSessions(): Promise { + try { + const entries = await readdir(this.stateRoot, { withFileTypes: true }); + const sessions: SessionMetadata[] = []; + + for (const entry of entries) { + if (!entry.isDirectory()) { + continue; + } + + const metadata = await this.readSession(entry.name); + if (metadata) { + sessions.push(metadata); + } + } + + sessions.sort((left, right) => right.createdAt.localeCompare(left.createdAt)); + return sessions; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return []; + } + throw error; + } + } + + async updateSession( + sessionId: string, + patch: Partial>, + ): Promise { + const current = await this.readSession(sessionId); + if (!current) { + throw new Error(`Session "${sessionId}" does not exist.`); + } + + const next: SessionMetadata = { + ...current, + ...(patch.projectPath ? { projectPath: assertAbsolutePath(patch.projectPath, "projectPath") } : {}), + ...(patch.baseWorkspacePath + ? { baseWorkspacePath: assertAbsolutePath(patch.baseWorkspacePath, "baseWorkspacePath") } + : {}), + ...(patch.sessionStatus ? { sessionStatus: patch.sessionStatus } : {}), + updatedAt: new Date().toISOString(), + }; + + await this.writeSessionMetadata(next); + return next; + } + + private async writeSessionMetadata(metadata: SessionMetadata): Promise { + const metadataPath = this.getSessionMetadataPath(metadata.sessionId); + await mkdir(dirname(metadataPath), { recursive: true }); + await withFileLock(`${metadataPath}.lock`, async () => { + await writeUtf8FileAtomic(metadataPath, `${JSON.stringify(metadata, null, 2)}\n`); + }); + } +} + +export class SessionWorktreeManager { + private readonly worktreeRoot: string; + private readonly baseRef: string; + private readonly targetPath?: string; + + constructor(input: { + worktreeRoot: string; + baseRef: string; + targetPath?: string; + }) { + this.worktreeRoot = assertAbsolutePath(input.worktreeRoot, "worktreeRoot"); + this.baseRef = assertNonEmptyString(input.baseRef, "baseRef"); + this.targetPath = normalizeWorktreeTargetPath(input.targetPath, "targetPath"); + } + + resolveBaseWorkspacePath(sessionId: string): string { + const scoped = sanitizeSegment(sessionId, "session"); + return resolve(this.worktreeRoot, scoped, "base"); + } + + resolveTaskWorktreePath(sessionId: string, taskId: string): string { + const scopedSession = sanitizeSegment(sessionId, "session"); + const scopedTask = sanitizeSegment(taskId, "task"); + return resolve(this.worktreeRoot, scopedSession, "tasks", scopedTask); + } + + resolveWorkingDirectoryForWorktree(worktreePath: string): string { + const normalizedWorktreePath = assertAbsolutePath(worktreePath, "worktreePath"); + return this.targetPath ? resolve(normalizedWorktreePath, this.targetPath) : normalizedWorktreePath; + } + + private resolveBaseBranchName(sessionId: string): string { + const scoped = sanitizeSegment(sessionId, "session"); + return `ai-ops/${scoped}/base`; + } + + private resolveTaskBranchName(sessionId: string, taskId: string): string { + const scopedSession = sanitizeSegment(sessionId, "session"); + const scopedTask = sanitizeSegment(taskId, "task"); + return `ai-ops/${scopedSession}/task/${scopedTask}`; + } + + async initializeSessionBaseWorkspace(input: { + sessionId: string; + projectPath: string; + baseWorkspacePath: string; + }): Promise { + const projectPath = assertAbsolutePath(input.projectPath, "projectPath"); + const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath"); + + await mkdir(dirname(baseWorkspacePath), { recursive: true }); + + if (!(await pathExists(baseWorkspacePath))) { + const repoRoot = await runGit(["-C", projectPath, "rev-parse", "--show-toplevel"]); + const branchName = this.resolveBaseBranchName(input.sessionId); + await runGit(["-C", repoRoot, "worktree", "add", "-B", branchName, baseWorkspacePath, this.baseRef]); + } + + await this.ensureWorktreeTargetPath(baseWorkspacePath); + } + + async ensureTaskWorktree(input: { + sessionId: string; + taskId: string; + baseWorkspacePath: string; + existingWorktreePath?: string; + }): Promise<{ + taskWorktreePath: string; + taskWorkingDirectory: string; + }> { + const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath"); + const maybeExisting = input.existingWorktreePath?.trim(); + const worktreePath = maybeExisting + ? assertAbsolutePath(maybeExisting, "existingWorktreePath") + : this.resolveTaskWorktreePath(input.sessionId, input.taskId); + const branchName = this.resolveTaskBranchName(input.sessionId, input.taskId); + const attachedWorktree = await this.findWorktreePathForBranch(baseWorkspacePath, branchName); + + const normalizedWorktreePath = normalizeWorktreePath(worktreePath); + const normalizedAttachedWorktree = attachedWorktree ? normalizeWorktreePath(attachedWorktree) : undefined; + + if (normalizedAttachedWorktree && normalizedAttachedWorktree !== normalizedWorktreePath) { + throw new Error( + `Task branch "${branchName}" is already attached to worktree "${attachedWorktree}", ` + + `expected "${worktreePath}".`, + ); + } + + if (!(await pathExists(worktreePath))) { + await runGit(["-C", baseWorkspacePath, "worktree", "prune", "--expire", "now"]); + } + + if (!(await pathExists(worktreePath))) { + await mkdir(dirname(worktreePath), { recursive: true }); + const addResult = await runGitWithResult([ + "-C", + baseWorkspacePath, + "worktree", + "add", + "-B", + branchName, + worktreePath, + "HEAD", + ]); + if (addResult.exitCode !== 0) { + const attachedAfterFailure = await this.findWorktreePathForBranch(baseWorkspacePath, branchName); + if ( + attachedAfterFailure && + normalizeWorktreePath(attachedAfterFailure) === normalizedWorktreePath && + (await pathExists(worktreePath)) + ) { + const taskWorkingDirectory = await this.ensureWorktreeTargetPath(worktreePath); + return { + taskWorktreePath: worktreePath, + taskWorkingDirectory, + }; + } + throw new Error( + `git -C ${baseWorkspacePath} worktree add -B ${branchName} ${worktreePath} HEAD failed: ` + + `${toGitFailureMessage(addResult)}`, + ); + } + } + + const taskWorkingDirectory = await this.ensureWorktreeTargetPath(worktreePath); + return { + taskWorktreePath: worktreePath, + taskWorkingDirectory, + }; + } + + async mergeTaskIntoBase(input: { + taskId: string; + baseWorkspacePath: string; + taskWorktreePath: string; + }): Promise { + const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath"); + const taskWorktreePath = assertAbsolutePath(input.taskWorktreePath, "taskWorktreePath"); + const taskId = input.taskId; + + if (!(await pathExists(baseWorkspacePath))) { + throw new Error(`Base workspace "${baseWorkspacePath}" does not exist.`); + } + if (!(await pathExists(taskWorktreePath))) { + throw new Error(`Task worktree "${taskWorktreePath}" does not exist.`); + } + + let mergeBase: string | undefined; + try { + await runGit(["-C", taskWorktreePath, "add", "-A"]); + const hasPending = await this.hasStagedChanges(taskWorktreePath); + if (hasPending) { + await runGit(["-C", taskWorktreePath, "commit", "-m", `ai_ops: finalize task ${taskId}`]); + } + + const branchName = await runGit(["-C", taskWorktreePath, "rev-parse", "--abbrev-ref", "HEAD"]); + const baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]); + mergeBase = await this.tryReadMergeBase(baseWorkspacePath, baseBranch, branchName); + + if (await this.hasOngoingMerge(taskWorktreePath)) { + return { + kind: "conflict", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(taskWorktreePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + const syncTaskBranch = await runGitWithResult([ + "-C", + taskWorktreePath, + "merge", + "--no-ff", + "--no-edit", + baseBranch, + ]); + + if (syncTaskBranch.exitCode === 1) { + return { + kind: "conflict", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(taskWorktreePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + if (syncTaskBranch.exitCode !== 0) { + return { + kind: "fatal_error", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + error: toGitFailureMessage(syncTaskBranch), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + if (await this.hasOngoingMerge(baseWorkspacePath)) { + return { + kind: "conflict", + taskId, + worktreePath: baseWorkspacePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(baseWorkspacePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + const mergeIntoBase = await runGitWithResult([ + "-C", + baseWorkspacePath, + "merge", + "--no-ff", + "--no-edit", + branchName, + ]); + + if (mergeIntoBase.exitCode === 1) { + return { + kind: "conflict", + taskId, + worktreePath: baseWorkspacePath, + baseWorkspacePath, + conflictFiles: await this.readConflictFiles(baseWorkspacePath), + ...(mergeBase ? { mergeBase } : {}), + }; + } + if (mergeIntoBase.exitCode !== 0) { + return { + kind: "fatal_error", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + error: toGitFailureMessage(mergeIntoBase), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + await this.removeWorktree({ + repoPath: baseWorkspacePath, + worktreePath: taskWorktreePath, + }); + + return { + kind: "success", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + }; + } catch (error) { + return { + kind: "fatal_error", + taskId, + worktreePath: taskWorktreePath, + baseWorkspacePath, + error: toErrorMessage(error), + ...(mergeBase ? { mergeBase } : {}), + }; + } + } + + async closeSession(input: { + session: SessionMetadata; + taskWorktreePaths: string[]; + mergeBaseIntoProject?: boolean; + }): Promise { + const projectPath = assertAbsolutePath(input.session.projectPath, "projectPath"); + const baseWorkspacePath = assertAbsolutePath(input.session.baseWorkspacePath, "baseWorkspacePath"); + if (!(await pathExists(projectPath))) { + throw new Error(`Project path "${projectPath}" does not exist.`); + } + if (!(await pathExists(baseWorkspacePath))) { + throw new Error(`Base workspace "${baseWorkspacePath}" does not exist.`); + } + + let baseBranch: string | undefined; + let mergeBase: string | undefined; + + try { + for (const taskWorktreePath of input.taskWorktreePaths) { + if (!taskWorktreePath.trim()) { + continue; + } + + await this.removeWorktree({ + repoPath: baseWorkspacePath, + worktreePath: taskWorktreePath, + }); + } + + if (input.mergeBaseIntoProject) { + baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]); + mergeBase = await this.tryReadMergeBase(projectPath, "HEAD", baseBranch); + + if (await this.hasOngoingMerge(projectPath)) { + return { + kind: "conflict", + sessionId: input.session.sessionId, + worktreePath: projectPath, + conflictFiles: await this.readConflictFiles(projectPath), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } + + const mergeResult = await runGitWithResult([ + "-C", + projectPath, + "merge", + "--no-ff", + "--no-edit", + baseBranch, + ]); + if (mergeResult.exitCode === 1) { + return { + kind: "conflict", + sessionId: input.session.sessionId, + worktreePath: projectPath, + conflictFiles: await this.readConflictFiles(projectPath), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } + if (mergeResult.exitCode !== 0) { + return { + kind: "fatal_error", + sessionId: input.session.sessionId, + error: toGitFailureMessage(mergeResult), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } + } + + await this.removeWorktree({ + repoPath: projectPath, + worktreePath: baseWorkspacePath, + }); + + return { + kind: "success", + sessionId: input.session.sessionId, + mergedToProject: input.mergeBaseIntoProject === true, + }; + } catch (error) { + return { + kind: "fatal_error", + sessionId: input.session.sessionId, + error: toErrorMessage(error), + ...(baseBranch ? { baseBranch } : {}), + ...(mergeBase ? { mergeBase } : {}), + }; + } + } + + private async removeWorktree(input: { + repoPath: string; + worktreePath: string; + }): Promise { + if (!(await pathExists(input.worktreePath))) { + return; + } + + await runGit(["-C", input.repoPath, "worktree", "remove", "--force", input.worktreePath]); + await runGit(["-C", input.repoPath, "worktree", "prune"]); + } + + private async hasStagedChanges(worktreePath: string): Promise { + try { + await execFileAsync("git", ["-C", worktreePath, "diff", "--cached", "--quiet"], { + encoding: "utf8", + }); + return false; + } catch (error) { + const exitCode = (error as { code?: number }).code; + if (exitCode === 1) { + return true; + } + throw new Error(`Unable to inspect staged changes: ${toErrorMessage(error)}`); + } + } + + private async hasOngoingMerge(worktreePath: string): Promise { + const result = await runGitWithResult([ + "-C", + worktreePath, + "rev-parse", + "-q", + "--verify", + "MERGE_HEAD", + ]); + return result.exitCode === 0; + } + + private async readConflictFiles(worktreePath: string): Promise { + const result = await runGitWithResult([ + "-C", + worktreePath, + "diff", + "--name-only", + "--diff-filter=U", + ]); + if (result.exitCode !== 0) { + return []; + } + return toStringLines(result.stdout); + } + + private async tryReadMergeBase( + repoPath: string, + leftRef: string, + rightRef: string, + ): Promise { + const result = await runGitWithResult(["-C", repoPath, "merge-base", leftRef, rightRef]); + if (result.exitCode !== 0) { + return undefined; + } + const mergeBase = result.stdout.trim(); + return mergeBase || undefined; + } + + private async findWorktreePathForBranch( + repoPath: string, + branchName: string, + ): Promise { + const branchRef = `refs/heads/${branchName}`; + const records = await this.listWorktreeRecords(repoPath); + const matched = records.find((record) => record.branchRef === branchRef); + if (!matched) { + return undefined; + } + return resolve(matched.path); + } + + private async listWorktreeRecords(repoPath: string): Promise { + const result = await runGitWithResult(["-C", repoPath, "worktree", "list", "--porcelain"]); + if (result.exitCode !== 0) { + return []; + } + return parseGitWorktreeRecords(result.stdout); + } + + private async ensureWorktreeTargetPath(worktreePath: string): Promise { + if (this.targetPath) { + await runGit(["-C", worktreePath, "sparse-checkout", "init", "--cone"]); + await runGit(["-C", worktreePath, "sparse-checkout", "set", this.targetPath]); + } + + const workingDirectory = this.resolveWorkingDirectoryForWorktree(worktreePath); + let workingDirectoryStats; + try { + workingDirectoryStats = await stat(workingDirectory); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + if (this.targetPath) { + throw new Error( + `Configured worktree target path "${this.targetPath}" is not a directory in ref "${this.baseRef}".`, + ); + } + throw new Error(`Worktree path "${workingDirectory}" does not exist.`); + } + throw error; + } + + if (!workingDirectoryStats.isDirectory()) { + if (this.targetPath) { + throw new Error( + `Configured worktree target path "${this.targetPath}" is not a directory in ref "${this.baseRef}".`, + ); + } + throw new Error(`Worktree path "${workingDirectory}" is not a directory.`); + } + + return workingDirectory; + } +} + +function normalizeWorktreeTargetPath(value: string | undefined, key: string): string | undefined { + if (value === undefined) { + return undefined; + } + + const trimmed = value.trim(); + if (trimmed.length === 0) { + return undefined; + } + + const slashNormalized = trimmed.replace(/\\/g, "/"); + if (isAbsolute(slashNormalized) || /^[a-zA-Z]:\//.test(slashNormalized)) { + throw new Error(`${key} must be a relative path within the repository worktree.`); + } + + const normalizedSegments = slashNormalized + .split("/") + .map((segment) => segment.trim()) + .filter((segment) => segment.length > 0 && segment !== "."); + + if (normalizedSegments.some((segment) => segment === "..")) { + throw new Error(`${key} must not contain ".." path segments.`); + } + + if (normalizedSegments.length === 0) { + return undefined; + } + + return normalizedSegments.join("/"); +} diff --git a/src/config.ts b/src/config.ts index a914e7b..785f6fe 100644 --- a/src/config.ts +++ b/src/config.ts @@ -16,9 +16,22 @@ export type ProviderRuntimeConfig = { anthropicApiKey?: string; claudeModel?: string; claudeCodePath?: string; + claudeMaxTurns: number; + claudeObservability: ClaudeObservabilityRuntimeConfig; }; export type OpenAiAuthMode = "auto" | "chatgpt" | "api_key"; +export type ClaudeObservabilityMode = "off" | "stdout" | "file" | "both"; +export type ClaudeObservabilityVerbosity = "summary" | "full"; + +export type ClaudeObservabilityRuntimeConfig = { + mode: ClaudeObservabilityMode; + verbosity: ClaudeObservabilityVerbosity; + logPath: string; + includePartialMessages: boolean; + debug: boolean; + debugLogPath?: string; +}; export type McpRuntimeConfig = { configPath: string; @@ -30,6 +43,7 @@ export type OrchestrationRuntimeConfig = { maxDepth: number; maxRetries: number; maxChildren: number; + mergeConflictMaxAttempts: number; }; export type DiscoveryRuntimeConfig = { @@ -77,6 +91,7 @@ const DEFAULT_ORCHESTRATION: OrchestrationRuntimeConfig = { maxDepth: 4, maxRetries: 2, maxChildren: 4, + mergeConflictMaxAttempts: 2, }; const DEFAULT_PROVISIONING: BuiltInProvisioningConfig = { @@ -113,6 +128,17 @@ const DEFAULT_RUNTIME_EVENTS: RuntimeEventRuntimeConfig = { discordAlwaysNotifyTypes: ["session.started", "session.completed", "session.failed"], }; +const DEFAULT_CLAUDE_OBSERVABILITY: ClaudeObservabilityRuntimeConfig = { + mode: "off", + verbosity: "summary", + logPath: ".ai_ops/events/claude-trace.ndjson", + includePartialMessages: false, + debug: false, + debugLogPath: undefined, +}; + +const DEFAULT_CLAUDE_MAX_TURNS = 2; + function readOptionalString( env: NodeJS.ProcessEnv, key: string, @@ -272,6 +298,26 @@ function parseOpenAiAuthMode(raw: string): OpenAiAuthMode { ); } +function parseClaudeObservabilityMode(raw: string): ClaudeObservabilityMode { + if (raw === "off" || raw === "stdout" || raw === "file" || raw === "both") { + return raw; + } + + throw new Error( + 'Environment variable CLAUDE_OBSERVABILITY_MODE must be one of: "off", "stdout", "file", "both".', + ); +} + +function parseClaudeObservabilityVerbosity(raw: string): ClaudeObservabilityVerbosity { + if (raw === "summary" || raw === "full") { + return raw; + } + + throw new Error( + 'Environment variable CLAUDE_OBSERVABILITY_VERBOSITY must be one of: "summary", "full".', + ); +} + function deepFreeze(value: T): Readonly { if (value === null || typeof value !== "object") { return value; @@ -358,6 +404,44 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly console.log(output)); const sessionContext = await createSessionContextFn("claude", { prompt, + workspaceRoot: process.cwd(), config, }); diff --git a/src/examples/codex.ts b/src/examples/codex.ts index 711d965..e016559 100644 --- a/src/examples/codex.ts +++ b/src/examples/codex.ts @@ -48,6 +48,7 @@ export async function runCodexPrompt( const writeOutput = dependencies.writeOutput ?? ((output: string) => console.log(output)); const sessionContext = await createSessionContextFn("codex", { prompt, + workspaceRoot: process.cwd(), config, }); diff --git a/src/examples/session-context.ts b/src/examples/session-context.ts index 0c398f6..04be601 100644 --- a/src/examples/session-context.ts +++ b/src/examples/session-context.ts @@ -28,6 +28,7 @@ export async function createSessionContext( provider: SessionProvider, input: { prompt: string; + workspaceRoot: string; config?: Readonly; mcpRegistry?: McpRegistry; }, @@ -58,6 +59,7 @@ export async function createSessionContext( provisionedResources = await resourceProvisioning.provisionSession({ sessionId: agentSession.id, resources: [{ kind: "git-worktree" }, { kind: "port-range" }], + workspaceRoot: input.workspaceRoot, }); const providerAuthEnv = @@ -82,6 +84,7 @@ export async function createSessionContext( { providerHint: provider, prompt: input.prompt, + workingDirectory: runtimeInjection.workingDirectory, }, { config, diff --git a/src/mcp.ts b/src/mcp.ts index ccb59e0..4ab88aa 100644 --- a/src/mcp.ts +++ b/src/mcp.ts @@ -1,5 +1,5 @@ import { existsSync, readFileSync } from "node:fs"; -import { resolve } from "node:path"; +import { isAbsolute, resolve } from "node:path"; import type { CodexOptions } from "@openai/codex-sdk"; import { getConfig, type AppConfig } from "./config.js"; import { normalizeSharedMcpConfigFile } from "./mcp/converters.js"; @@ -23,12 +23,17 @@ import type { import { parseMcpConfig } from "./mcp/types.js"; import type { ToolClearancePolicy } from "./security/schemas.js"; -function readConfigFile(configPath: string): { +function readConfigFile(input: { + configPath: string; + workingDirectory?: string; +}): { config?: SharedMcpConfigFile; sourcePath?: string; } { - const candidatePath = configPath.trim() || "./mcp.config.json"; - const resolvedPath = resolve(process.cwd(), candidatePath); + const candidatePath = input.configPath.trim() || "./mcp.config.json"; + const resolvedPath = isAbsolute(candidatePath) + ? candidatePath + : resolve(input.workingDirectory ?? process.cwd(), candidatePath); if (!existsSync(resolvedPath)) { if (candidatePath !== "./mcp.config.json") { @@ -83,7 +88,10 @@ export function loadMcpConfigFromEnv( const registry = options?.registry ?? defaultMcpRegistry; const warn = options?.warn ?? ((message: string) => console.warn(message)); - const { config, sourcePath } = readConfigFile(runtimeConfig.mcp.configPath); + const { config, sourcePath } = readConfigFile({ + configPath: runtimeConfig.mcp.configPath, + workingDirectory: context.workingDirectory, + }); if (!config) { return {}; } diff --git a/src/mcp/types.ts b/src/mcp/types.ts index 597a711..9016c4b 100644 --- a/src/mcp/types.ts +++ b/src/mcp/types.ts @@ -50,6 +50,7 @@ export type SharedMcpConfigFile = { export type McpLoadContext = { providerHint?: "codex" | "claude" | "both"; prompt?: string; + workingDirectory?: string; }; export type LoadedMcpConfig = { diff --git a/src/runs/run-service.ts b/src/runs/run-service.ts index 06c3370..da4d6c4 100644 --- a/src/runs/run-service.ts +++ b/src/runs/run-service.ts @@ -1,13 +1,20 @@ import { randomUUID } from "node:crypto"; import { mkdir, readFile, writeFile } from "node:fs/promises"; import { resolve } from "node:path"; +import { JSONFilePreset } from "lowdb/node"; import { SchemaDrivenExecutionEngine } from "../agents/orchestration.js"; import { parseAgentManifest, type AgentManifest } from "../agents/manifest.js"; +import { FileSystemProjectContextStore } from "../agents/project-context.js"; import type { ActorExecutionResult, ActorExecutor, PipelineAggregateStatus, } from "../agents/pipeline.js"; +import { + FileSystemSessionMetadataStore, + SessionWorktreeManager, + type SessionMetadata, +} from "../agents/session-lifecycle.js"; import { loadConfig, type AppConfig } from "../config.js"; import { parseEnvFile } from "../store/env-store.js"; import { @@ -240,7 +247,18 @@ async function loadRuntimeConfig(envPath: string): Promise> }); } -import { JSONFilePreset } from "lowdb/node"; +function resolveRuntimePaths(input: { + workspaceRoot: string; + config: Readonly; +}): { + stateRoot: string; + worktreeRoot: string; +} { + return { + stateRoot: resolve(input.workspaceRoot, input.config.orchestration.stateRoot), + worktreeRoot: resolve(input.workspaceRoot, input.config.provisioning.gitWorktree.rootDirectory), + }; +} async function writeRunMeta(input: { stateRoot: string; @@ -323,6 +341,103 @@ export class UiRunService { this.envFilePath = resolve(this.workspaceRoot, input.envFilePath ?? ".env"); } + private async loadRuntime(): Promise<{ + config: Readonly; + stateRoot: string; + sessionStore: FileSystemSessionMetadataStore; + worktreeManager: SessionWorktreeManager; + }> { + const config = await loadRuntimeConfig(this.envFilePath); + const paths = resolveRuntimePaths({ + workspaceRoot: this.workspaceRoot, + config, + }); + + return { + config, + stateRoot: paths.stateRoot, + sessionStore: new FileSystemSessionMetadataStore({ + stateRoot: paths.stateRoot, + }), + worktreeManager: new SessionWorktreeManager({ + worktreeRoot: paths.worktreeRoot, + baseRef: config.provisioning.gitWorktree.baseRef, + targetPath: config.provisioning.gitWorktree.targetPath, + }), + }; + } + + async createSession(input: { + projectPath: string; + sessionId?: string; + }): Promise { + const runtime = await this.loadRuntime(); + const sessionId = input.sessionId?.trim() || toSessionId(); + const baseWorkspacePath = runtime.worktreeManager.resolveBaseWorkspacePath(sessionId); + const session = await runtime.sessionStore.createSession({ + sessionId, + projectPath: resolve(input.projectPath), + baseWorkspacePath, + }); + + await runtime.worktreeManager.initializeSessionBaseWorkspace({ + sessionId: session.sessionId, + projectPath: session.projectPath, + baseWorkspacePath: session.baseWorkspacePath, + }); + + return session; + } + + async listSessions(): Promise { + const runtime = await this.loadRuntime(); + return runtime.sessionStore.listSessions(); + } + + async readSession(sessionId: string): Promise { + const runtime = await this.loadRuntime(); + return runtime.sessionStore.readSession(sessionId); + } + + async closeSession(input: { + sessionId: string; + mergeToProject?: boolean; + }): Promise { + const runtime = await this.loadRuntime(); + const session = await runtime.sessionStore.readSession(input.sessionId); + if (!session) { + throw new Error(`Session \"${input.sessionId}\" does not exist.`); + } + + const sessionProjectContextStore = new FileSystemProjectContextStore({ + filePath: runtime.sessionStore.getSessionProjectContextPath(session.sessionId), + }); + const projectContext = await sessionProjectContextStore.readState(); + const taskWorktreePaths = projectContext.taskQueue + .map((task) => task.worktreePath) + .filter((path): path is string => typeof path === "string" && path.trim().length > 0); + + const outcome = await runtime.worktreeManager.closeSession({ + session, + taskWorktreePaths, + mergeBaseIntoProject: input.mergeToProject === true, + }); + + if (outcome.kind === "fatal_error") { + throw new Error(`Session close failed: ${outcome.error}`); + } + + if (outcome.kind === "conflict") { + return runtime.sessionStore.updateSession(session.sessionId, { + sessionStatus: "closed_with_conflicts", + }); + } + + return runtime.sessionStore.updateSession(session.sessionId, { + sessionStatus: "closed", + }); + } + listRuns(): RunRecord[] { const output = [...this.runHistory.values()].sort((left, right) => { return right.startedAt.localeCompare(left.startedAt); @@ -335,11 +450,24 @@ export class UiRunService { } async startRun(input: StartRunInput): Promise { - const config = await loadRuntimeConfig(this.envFilePath); + const runtime = await this.loadRuntime(); + const config = runtime.config; const manifest = parseAgentManifest(input.manifest); const executionMode = input.executionMode ?? "mock"; const provider = input.provider ?? "codex"; const sessionId = input.sessionId?.trim() || toSessionId(); + const session = input.sessionId?.trim() + ? await runtime.sessionStore.readSession(sessionId) + : undefined; + if (input.sessionId?.trim() && !session) { + throw new Error(`Session \"${sessionId}\" does not exist.`); + } + if ( + session && + (session.sessionStatus === "closed" || session.sessionStatus === "closed_with_conflicts") + ) { + throw new Error(`Session \"${sessionId}\" is closed and cannot run new tasks.`); + } const runId = randomUUID(); const controller = new AbortController(); @@ -361,8 +489,9 @@ export class UiRunService { if (executionMode === "provider") { providerRuntime = await createProviderRunRuntime({ provider, - initialPrompt: input.prompt, config, + observabilityRootPath: this.workspaceRoot, + baseEnv: process.env, }); } @@ -380,11 +509,20 @@ export class UiRunService { actorExecutors, settings: { workspaceRoot: this.workspaceRoot, - stateRoot: config.orchestration.stateRoot, - projectContextPath: config.orchestration.projectContextPath, + stateRoot: runtime.stateRoot, + projectContextPath: session + ? runtime.sessionStore.getSessionProjectContextPath(sessionId) + : resolve(this.workspaceRoot, config.orchestration.projectContextPath), runtimeContext: { ui_mode: executionMode, run_provider: provider, + ...(session + ? { + session_id: sessionId, + project_path: session.projectPath, + base_workspace_path: session.baseWorkspacePath, + } + : {}), ...(input.runtimeContextOverrides ?? {}), }, }, @@ -392,7 +530,7 @@ export class UiRunService { }); await writeRunMeta({ - stateRoot: config.orchestration.stateRoot, + stateRoot: runtime.stateRoot, sessionId, run: record, }); @@ -408,6 +546,7 @@ export class UiRunService { }, }, signal: controller.signal, + ...(session ? { sessionMetadata: session } : {}), }); const completedRecord = this.runHistory.get(runId); @@ -423,7 +562,7 @@ export class UiRunService { this.runHistory.set(runId, next); await writeRunMeta({ - stateRoot: config.orchestration.stateRoot, + stateRoot: runtime.stateRoot, sessionId, run: next, }); @@ -443,7 +582,7 @@ export class UiRunService { this.runHistory.set(runId, next); await writeRunMeta({ - stateRoot: config.orchestration.stateRoot, + stateRoot: runtime.stateRoot, sessionId, run: next, }); diff --git a/src/security/rules-engine.ts b/src/security/rules-engine.ts index 5b5d723..d8116ec 100644 --- a/src/security/rules-engine.ts +++ b/src/security/rules-engine.ts @@ -8,46 +8,49 @@ import { import { parseShellValidationPolicy, parseToolClearancePolicy, + type SecurityViolationHandling, type ShellValidationPolicy, type ToolClearancePolicy, } from "./schemas.js"; export type SecurityAuditEvent = - | { + | ({ type: "shell.command_profiled"; - timestamp: string; command: string; cwd: string; parsed: ParsedShellScript; - } - | { + } & SecurityAuditContext) + | ({ type: "shell.command_allowed"; - timestamp: string; command: string; cwd: string; commandCount: number; - } - | { + } & SecurityAuditContext) + | ({ type: "shell.command_blocked"; - timestamp: string; command: string; cwd: string; reason: string; code: string; details?: Record; - } - | { + } & SecurityAuditContext) + | ({ type: "tool.invocation_allowed"; - timestamp: string; tool: string; - } - | { + } & SecurityAuditContext) + | ({ type: "tool.invocation_blocked"; - timestamp: string; tool: string; reason: string; code: string; - }; + } & SecurityAuditContext); + +export type SecurityAuditContext = { + timestamp: string; + sessionId?: string; + nodeId?: string; + attempt?: number; +}; export type SecurityAuditSink = (event: SecurityAuditEvent) => void; @@ -60,6 +63,10 @@ function normalizeToken(value: string): string { return value.trim(); } +function normalizeLookupToken(value: string): string { + return normalizeToken(value).toLowerCase(); +} + function hasPathTraversalSegment(token: string): boolean { const normalized = token.replaceAll("\\", "/"); if (normalized === ".." || normalized.startsWith("../") || normalized.endsWith("/..")) { @@ -98,10 +105,44 @@ function toToolSet(values: readonly string[]): Set { return out; } +function toCaseInsensitiveLookup(values: readonly string[]): Map { + const out = new Map(); + for (const value of values) { + const normalized = normalizeLookupToken(value); + if (!normalized || out.has(normalized)) { + continue; + } + out.set(normalized, value); + } + return out; +} + function toNow(): string { return new Date().toISOString(); } +function toAuditContext(input?: { + sessionId?: string; + nodeId?: string; + attempt?: number; +}): SecurityAuditContext { + const output: SecurityAuditContext = { + timestamp: toNow(), + }; + + if (input?.sessionId) { + output.sessionId = input.sessionId; + } + if (input?.nodeId) { + output.nodeId = input.nodeId; + } + if (typeof input?.attempt === "number" && Number.isInteger(input.attempt) && input.attempt >= 1) { + output.attempt = input.attempt; + } + + return output; +} + export class SecurityRulesEngine { private readonly policy: ShellValidationPolicy; private readonly allowedBinaries: Set; @@ -109,10 +150,14 @@ export class SecurityRulesEngine { private readonly blockedEnvAssignments: Set; private readonly worktreeRoot: string; private readonly protectedPaths: string[]; + private readonly violationHandling: SecurityViolationHandling; constructor( policy: ShellValidationPolicy, private readonly auditSink?: SecurityAuditSink, + options?: { + violationHandling?: SecurityViolationHandling; + }, ) { this.policy = parseShellValidationPolicy(policy); this.allowedBinaries = toToolSet(this.policy.allowedBinaries); @@ -120,6 +165,7 @@ export class SecurityRulesEngine { this.blockedEnvAssignments = toToolSet(this.policy.blockedEnvAssignments); this.worktreeRoot = resolve(this.policy.worktreeRoot); this.protectedPaths = this.policy.protectedPaths.map((path) => resolve(path)); + this.violationHandling = options?.violationHandling ?? "hard_abort"; } getPolicy(): ShellValidationPolicy { @@ -136,6 +182,11 @@ export class SecurityRulesEngine { command: string; cwd: string; toolClearance?: ToolClearancePolicy; + context?: { + sessionId?: string; + nodeId?: string; + attempt?: number; + }; }): Promise { const resolvedCwd = resolve(input.cwd); @@ -147,22 +198,22 @@ export class SecurityRulesEngine { : undefined; this.emit({ + ...toAuditContext(input.context), type: "shell.command_profiled", - timestamp: toNow(), command: input.command, cwd: resolvedCwd, parsed, }); for (const command of parsed.commands) { - this.assertBinaryAllowed(command, toolClearance); + this.assertBinaryAllowed(command, toolClearance, input.context); this.assertAssignmentsAllowed(command); this.assertArgumentPaths(command, resolvedCwd); } this.emit({ + ...toAuditContext(input.context), type: "shell.command_allowed", - timestamp: toNow(), command: input.command, cwd: resolvedCwd, commandCount: parsed.commandCount, @@ -175,14 +226,23 @@ export class SecurityRulesEngine { } catch (error) { if (error instanceof SecurityViolationError) { this.emit({ + ...toAuditContext(input.context), type: "shell.command_blocked", - timestamp: toNow(), command: input.command, cwd: resolvedCwd, reason: error.message, code: error.code, details: error.details, }); + if (this.violationHandling === "dangerous_warn_only") { + return { + cwd: resolvedCwd, + parsed: { + commandCount: 0, + commands: [], + }, + }; + } throw error; } @@ -196,13 +256,21 @@ export class SecurityRulesEngine { assertToolInvocationAllowed(input: { tool: string; toolClearance: ToolClearancePolicy; + context?: { + sessionId?: string; + nodeId?: string; + attempt?: number; + }; }): void { const policy = parseToolClearancePolicy(input.toolClearance); + const normalizedTool = normalizeLookupToken(input.tool); + const banlistLookup = toCaseInsensitiveLookup(policy.banlist); + const allowlistLookup = toCaseInsensitiveLookup(policy.allowlist); - if (policy.banlist.includes(input.tool)) { + if (banlistLookup.has(normalizedTool)) { this.emit({ + ...toAuditContext(input.context), type: "tool.invocation_blocked", - timestamp: toNow(), tool: input.tool, reason: `Tool "${input.tool}" is explicitly banned by policy.`, code: "TOOL_BANNED", @@ -218,10 +286,10 @@ export class SecurityRulesEngine { ); } - if (policy.allowlist.length > 0 && !policy.allowlist.includes(input.tool)) { + if (policy.allowlist.length > 0 && !allowlistLookup.has(normalizedTool)) { this.emit({ + ...toAuditContext(input.context), type: "tool.invocation_blocked", - timestamp: toNow(), tool: input.tool, reason: `Tool "${input.tool}" is not present in allowlist.`, code: "TOOL_NOT_ALLOWED", @@ -238,21 +306,23 @@ export class SecurityRulesEngine { } this.emit({ + ...toAuditContext(input.context), type: "tool.invocation_allowed", - timestamp: toNow(), tool: input.tool, }); } filterAllowedTools(tools: string[], toolClearance: ToolClearancePolicy): string[] { const policy = parseToolClearancePolicy(toolClearance); + const allowlistLookup = toCaseInsensitiveLookup(policy.allowlist); + const banlistLookup = toCaseInsensitiveLookup(policy.banlist); const allowedByAllowlist = policy.allowlist.length === 0 ? tools - : tools.filter((tool) => policy.allowlist.includes(tool)); + : tools.filter((tool) => allowlistLookup.has(normalizeLookupToken(tool))); - return allowedByAllowlist.filter((tool) => !policy.banlist.includes(tool)); + return allowedByAllowlist.filter((tool) => !banlistLookup.has(normalizeLookupToken(tool))); } private assertCwdBoundary(cwd: string): void { @@ -290,6 +360,11 @@ export class SecurityRulesEngine { private assertBinaryAllowed( command: ParsedShellCommand, toolClearance?: ToolClearancePolicy, + context?: { + sessionId?: string; + nodeId?: string; + attempt?: number; + }, ): void { const binaryToken = normalizeToken(command.binary); const binaryName = basename(binaryToken); @@ -313,6 +388,7 @@ export class SecurityRulesEngine { this.assertToolInvocationAllowed({ tool: binaryName, toolClearance, + context, }); } diff --git a/src/security/schemas.ts b/src/security/schemas.ts index d7a3437..a1e9662 100644 --- a/src/security/schemas.ts +++ b/src/security/schemas.ts @@ -157,11 +157,15 @@ export function parseParsedShellScript(input: unknown): ParsedShellScript { }; } -export type SecurityViolationHandling = "hard_abort" | "validation_fail"; +export type SecurityViolationHandling = + | "hard_abort" + | "validation_fail" + | "dangerous_warn_only"; export const securityViolationHandlingSchema = z.union([ z.literal("hard_abort"), z.literal("validation_fail"), + z.literal("dangerous_warn_only"), ]); export function parseSecurityViolationHandling(input: unknown): SecurityViolationHandling { diff --git a/src/store/config-store.ts b/src/store/config-store.ts index 109af78..bdd3766 100644 --- a/src/store/config-store.ts +++ b/src/store/config-store.ts @@ -1,5 +1,6 @@ import { resolve } from "node:path"; import { loadConfig, type AppConfig } from "../config.js"; +import type { SecurityViolationHandling } from "../security/index.js"; import { parseEnvFile, writeEnvFileUpdates } from "./env-store.js"; export type RuntimeNotificationSettings = { @@ -9,7 +10,7 @@ export type RuntimeNotificationSettings = { }; export type SecurityPolicySettings = { - violationMode: "hard_abort" | "validation_fail"; + violationMode: SecurityViolationHandling; allowedBinaries: string[]; commandTimeoutMs: number; inheritedEnv: string[]; @@ -23,6 +24,7 @@ export type LimitSettings = { topologyMaxDepth: number; topologyMaxRetries: number; relationshipMaxChildren: number; + mergeConflictMaxAttempts: number; portBase: number; portBlockSize: number; portBlockCount: number; @@ -38,6 +40,7 @@ export type UiConfigSnapshot = { stateRoot: string; projectContextPath: string; runtimeEventLogPath: string; + claudeTraceLogPath: string; securityAuditLogPath: string; }; }; @@ -88,6 +91,7 @@ function toLimits(config: Readonly): LimitSettings { topologyMaxDepth: config.orchestration.maxDepth, topologyMaxRetries: config.orchestration.maxRetries, relationshipMaxChildren: config.orchestration.maxChildren, + mergeConflictMaxAttempts: config.orchestration.mergeConflictMaxAttempts, portBase: config.provisioning.portRange.basePort, portBlockSize: config.provisioning.portRange.blockSize, portBlockCount: config.provisioning.portRange.blockCount, @@ -105,6 +109,7 @@ function toSnapshot(config: Readonly, envFilePath: string): UiConfigS stateRoot: config.orchestration.stateRoot, projectContextPath: config.orchestration.projectContextPath, runtimeEventLogPath: config.runtimeEvents.logPath, + claudeTraceLogPath: config.provider.claudeObservability.logPath, securityAuditLogPath: config.security.auditLogPath, }, }; @@ -170,6 +175,7 @@ export class UiConfigStore { AGENT_TOPOLOGY_MAX_DEPTH: String(input.topologyMaxDepth), AGENT_TOPOLOGY_MAX_RETRIES: String(input.topologyMaxRetries), AGENT_RELATIONSHIP_MAX_CHILDREN: String(input.relationshipMaxChildren), + AGENT_MERGE_CONFLICT_MAX_ATTEMPTS: String(input.mergeConflictMaxAttempts), AGENT_PORT_BASE: String(input.portBase), AGENT_PORT_BLOCK_SIZE: String(input.portBlockSize), AGENT_PORT_BLOCK_COUNT: String(input.portBlockCount), diff --git a/src/ui/claude-observability.ts b/src/ui/claude-observability.ts new file mode 100644 index 0000000..35013b8 --- /dev/null +++ b/src/ui/claude-observability.ts @@ -0,0 +1,821 @@ +import { randomUUID } from "node:crypto"; +import { appendFile, mkdir } from "node:fs/promises"; +import { dirname, resolve } from "node:path"; +import type { Options, SDKMessage } from "@anthropic-ai/claude-agent-sdk"; +import type { + ClaudeObservabilityMode, + ClaudeObservabilityRuntimeConfig, + ClaudeObservabilityVerbosity, +} from "../config.js"; +import type { JsonObject, JsonValue } from "../agents/types.js"; + +const MAX_STRING_LENGTH = 320; +const MAX_ARRAY_ITEMS = 20; +const MAX_OBJECT_KEYS = 60; +const MAX_DEPTH = 6; + +const NON_SECRET_TOKEN_KEYS = new Set([ + "input_tokens", + "output_tokens", + "total_tokens", + "cache_creation_input_tokens", + "cache_read_input_tokens", + "ephemeral_1h_input_tokens", + "ephemeral_5m_input_tokens", + "token_input", + "token_output", + "token_total", + "tokencount", + "token_count", + "tool_use_id", + "parent_tool_use_id", + "task_id", + "session_id", +]); + +type ClaudeTraceContext = { + sessionId: string; + nodeId: string; + attempt: number; + depth: number; +}; + +type ClaudeTraceRecord = { + id: string; + timestamp: string; + source: "claude_sdk"; + stage: + | "query.started" + | "query.message" + | "query.stderr" + | "query.completed" + | "query.error"; + message: string; + sessionId: string; + nodeId: string; + attempt: number; + depth: number; + sdkSessionId?: string; + sdkMessageType?: string; + sdkMessageSubtype?: string; + data?: JsonObject; +}; + +function truncate(value: string, maxLength = MAX_STRING_LENGTH): string { + if (value.length <= maxLength) { + return value; + } + return `${value.slice(0, maxLength)}...`; +} + +function isSensitiveKey(key: string): boolean { + const normalized = key.trim().toLowerCase(); + if (!normalized) { + return false; + } + + if (NON_SECRET_TOKEN_KEYS.has(normalized)) { + return false; + } + + if (/(api[_-]?key|secret|password|authorization|cookie)/i.test(key)) { + return true; + } + + if (/(auth[_-]?token|access[_-]?token|refresh[_-]?token|id[_-]?token|oauth)/i.test(key)) { + return true; + } + + return normalized === "token"; +} + +function toJsonPrimitive(value: unknown): JsonValue { + if (value === null) { + return null; + } + if (typeof value === "string") { + return truncate(value); + } + if (typeof value === "number") { + return Number.isFinite(value) ? value : String(value); + } + if (typeof value === "boolean") { + return value; + } + if (typeof value === "bigint") { + return String(value); + } + if (typeof value === "undefined") { + return null; + } + return truncate(String(value)); +} + +function sanitizeJsonValue(value: unknown, depth = 0): JsonValue { + if (depth >= MAX_DEPTH) { + return "[depth_limit]"; + } + + if ( + value === null || + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" || + typeof value === "bigint" || + typeof value === "undefined" + ) { + return toJsonPrimitive(value); + } + + if (Array.isArray(value)) { + const output = value.slice(0, MAX_ARRAY_ITEMS).map((entry) => sanitizeJsonValue(entry, depth + 1)); + if (value.length > MAX_ARRAY_ITEMS) { + output.push(`[+${String(value.length - MAX_ARRAY_ITEMS)} more]`); + } + return output; + } + + if (typeof value === "object") { + const output: JsonObject = {}; + const entries = Object.entries(value as Record); + const limited = entries.slice(0, MAX_OBJECT_KEYS); + for (const [key, entryValue] of limited) { + if (isSensitiveKey(key)) { + output[key] = "[redacted]"; + continue; + } + output[key] = sanitizeJsonValue(entryValue, depth + 1); + } + if (entries.length > MAX_OBJECT_KEYS) { + output.__truncated_keys = entries.length - MAX_OBJECT_KEYS; + } + return output; + } + + return truncate(String(value)); +} + +function readString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function readNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function readBoolean(value: unknown): boolean | undefined { + return typeof value === "boolean" ? value : undefined; +} + +function toMessageRecord(message: SDKMessage): Record { + return message as unknown as Record; +} + +function toMessageSubtype(message: SDKMessage): string | undefined { + return readString(toMessageRecord(message).subtype); +} + +function toMessageSessionId(message: SDKMessage): string | undefined { + return readString(toMessageRecord(message).session_id); +} + +function toTaskNotificationSummary(message: SDKMessage): { + summary: string; + data?: JsonObject; +} { + const raw = toMessageRecord(message); + const status = readString(raw.status) ?? "unknown"; + const data: JsonObject = { + status, + }; + + const taskId = readString(raw.task_id); + if (taskId) { + data.taskId = taskId; + } + + const summaryText = readString(raw.summary); + if (summaryText) { + data.summary = truncate(summaryText); + } + + const outputFile = readString(raw.output_file); + if (outputFile) { + data.outputFile = outputFile; + } + + if (raw.usage !== undefined) { + data.usage = sanitizeJsonValue(raw.usage); + } + + return { + summary: `Task notification: ${status}.`, + data, + }; +} + +function toTaskStartedSummary(message: SDKMessage): { + summary: string; + data?: JsonObject; +} { + const raw = toMessageRecord(message); + const data: JsonObject = {}; + + const taskId = readString(raw.task_id); + if (taskId) { + data.taskId = taskId; + } + + const description = readString(raw.description); + if (description) { + data.description = truncate(description); + } + + const taskType = readString(raw.task_type); + if (taskType) { + data.taskType = taskType; + } + + const toolUseId = readString(raw.tool_use_id); + if (toolUseId) { + data.toolUseId = toolUseId; + } + + return { + summary: "Task started.", + ...(Object.keys(data).length > 0 ? { data } : {}), + }; +} + +function toMessageSummary(message: SDKMessage): { + summary: string; + data?: JsonObject; +} { + const subtype = toMessageSubtype(message); + const raw = toMessageRecord(message); + + if (message.type === "result") { + if (message.subtype === "success") { + return { + summary: "Claude query result success.", + data: { + stopReason: message.stop_reason ?? null, + numTurns: message.num_turns, + usage: sanitizeJsonValue(message.usage) as JsonObject, + totalCostUsd: message.total_cost_usd, + }, + }; + } + + return { + summary: `Claude query result ${message.subtype}.`, + data: { + stopReason: message.stop_reason ?? null, + numTurns: message.num_turns, + usage: sanitizeJsonValue(message.usage) as JsonObject, + totalCostUsd: message.total_cost_usd, + errors: sanitizeJsonValue(message.errors), + }, + }; + } + + if (message.type === "tool_progress") { + return { + summary: `Tool progress: ${message.tool_name}.`, + data: { + toolName: message.tool_name, + toolUseId: message.tool_use_id, + elapsedTimeSeconds: message.elapsed_time_seconds, + parentToolUseId: message.parent_tool_use_id ?? null, + ...(message.task_id ? { taskId: message.task_id } : {}), + }, + }; + } + + if (message.type === "tool_use_summary") { + return { + summary: "Tool use summary emitted.", + data: { + summary: truncate(message.summary), + precedingToolUseIds: sanitizeJsonValue(message.preceding_tool_use_ids), + }, + }; + } + + if (message.type === "stream_event") { + const data: JsonObject = {}; + const eventType = readString((raw.event as Record | undefined)?.type); + if (eventType) { + data.eventType = eventType; + } + const parentToolUseId = readString(raw.parent_tool_use_id); + if (parentToolUseId) { + data.parentToolUseId = parentToolUseId; + } + return { + summary: "Partial assistant stream event emitted.", + ...(Object.keys(data).length > 0 ? { data } : {}), + }; + } + + if (message.type === "auth_status") { + return { + summary: message.isAuthenticating ? "Authentication in progress." : "Authentication status update.", + data: { + isAuthenticating: message.isAuthenticating, + output: sanitizeJsonValue(message.output), + ...(message.error ? { error: truncate(message.error) } : {}), + }, + }; + } + + if (message.type === "assistant") { + return { + summary: "Assistant message emitted.", + data: { + parentToolUseId: message.parent_tool_use_id ?? null, + ...(message.error ? { error: message.error } : {}), + }, + }; + } + + if (message.type === "user") { + const data: JsonObject = { + parentToolUseId: (message as { parent_tool_use_id?: string | null }).parent_tool_use_id ?? null, + }; + const isSynthetic = readBoolean(raw.isSynthetic); + if (isSynthetic !== undefined) { + data.isSynthetic = isSynthetic; + } + const isReplay = readBoolean(raw.isReplay); + if (isReplay !== undefined) { + data.isReplay = isReplay; + } + return { + summary: "User message emitted.", + data, + }; + } + + if (subtype === "task_notification") { + return toTaskNotificationSummary(message); + } + + if (subtype === "task_started") { + return toTaskStartedSummary(message); + } + + if (message.type === "system" && subtype === "files_persisted") { + const files = Array.isArray(raw.files) ? raw.files : []; + const failed = Array.isArray(raw.failed) ? raw.failed : []; + return { + summary: "System event: files_persisted.", + data: { + persistedFileCount: files.length, + failedFileCount: failed.length, + }, + }; + } + + if (message.type === "system" && subtype === "compact_boundary") { + return { + summary: "System event: compact_boundary.", + data: { + compactMetadata: sanitizeJsonValue(raw.compact_metadata), + }, + }; + } + + if (message.type === "system" && subtype === "status") { + const data: JsonObject = { + status: readString(raw.status) ?? "none", + }; + const permissionMode = readString(raw.permissionMode); + if (permissionMode) { + data.permissionMode = permissionMode; + } + return { + summary: "System event: status.", + data, + }; + } + + if (message.type === "system" && (subtype === "hook_started" || subtype === "hook_progress" || subtype === "hook_response")) { + const data: JsonObject = { + ...(subtype ? { subtype } : {}), + ...(readString(raw.hook_id) ? { hookId: readString(raw.hook_id) } : {}), + ...(readString(raw.hook_name) ? { hookName: readString(raw.hook_name) } : {}), + ...(readString(raw.hook_event) ? { hookEvent: readString(raw.hook_event) } : {}), + ...(readString(raw.outcome) ? { outcome: readString(raw.outcome) } : {}), + }; + if (raw.exit_code !== undefined) { + data.exitCode = sanitizeJsonValue(raw.exit_code); + } + return { + summary: `System event: ${subtype}.`, + data, + }; + } + + if (message.type === "system") { + return { + summary: subtype ? `System event: ${subtype}.` : "System event emitted.", + data: subtype ? { subtype } : undefined, + }; + } + + if (message.type === "rate_limit") { + return { + summary: "Rate limit event emitted.", + data: sanitizeJsonValue(raw) as JsonObject, + }; + } + + if (message.type === "prompt_suggestion") { + const data: JsonObject = { + ...(readString(raw.prompt) ? { prompt: truncate(readString(raw.prompt) as string) } : {}), + ...(readString(raw.suggestion) ? { suggestion: truncate(readString(raw.suggestion) as string) } : {}), + }; + return { + summary: "Prompt suggestion emitted.", + ...(Object.keys(data).length > 0 ? { data } : {}), + }; + } + + return { + summary: `Claude SDK message received (${message.type}).`, + }; +} + +function toRecord(input: { + stage: ClaudeTraceRecord["stage"]; + message: string; + context: ClaudeTraceContext; + sdkMessageType?: string; + sdkMessageSubtype?: string; + sdkSessionId?: string; + data?: JsonObject; +}): ClaudeTraceRecord { + return { + id: randomUUID(), + timestamp: new Date().toISOString(), + source: "claude_sdk", + stage: input.stage, + message: input.message, + sessionId: input.context.sessionId, + nodeId: input.context.nodeId, + attempt: input.context.attempt, + depth: input.context.depth, + ...(input.sdkMessageType ? { sdkMessageType: input.sdkMessageType } : {}), + ...(input.sdkMessageSubtype ? { sdkMessageSubtype: input.sdkMessageSubtype } : {}), + ...(input.sdkSessionId ? { sdkSessionId: input.sdkSessionId } : {}), + ...(input.data ? { data: input.data } : {}), + }; +} + +export function summarizeClaudeMessage( + message: SDKMessage, + verbosity: ClaudeObservabilityVerbosity, +): { + messageType: string; + messageSubtype?: string; + sdkSessionId?: string; + summary: string; + data?: JsonObject; +} { + const messageSubtype = toMessageSubtype(message); + const sdkSessionId = toMessageSessionId(message); + const summary = toMessageSummary(message); + if (verbosity === "full") { + return { + messageType: message.type, + ...(messageSubtype ? { messageSubtype } : {}), + ...(sdkSessionId ? { sdkSessionId } : {}), + summary: summary.summary, + data: { + message: sanitizeJsonValue(message) as JsonObject, + }, + }; + } + + return { + messageType: message.type, + ...(messageSubtype ? { messageSubtype } : {}), + ...(sdkSessionId ? { sdkSessionId } : {}), + summary: summary.summary, + ...(summary.data ? { data: summary.data } : {}), + }; +} + +export class ClaudeObservabilityLogger { + private readonly mode: ClaudeObservabilityMode; + private readonly verbosity: ClaudeObservabilityVerbosity; + private readonly logPath: string; + private readonly includePartialMessages: boolean; + private readonly debug: boolean; + private readonly debugLogPath?: string; + private readonly pendingWrites = new Set>(); + private readonly stdoutProgressByKey = new Map(); + private readonly fileProgressByKey = new Map(); + private readonly stdoutStreamByKey = new Map(); + private readonly fileStreamByKey = new Map(); + private fileWriteFailureCount = 0; + + constructor(input: { + workspaceRoot: string; + config: ClaudeObservabilityRuntimeConfig; + }) { + this.mode = input.config.mode; + this.verbosity = input.config.verbosity; + this.logPath = resolve(input.workspaceRoot, input.config.logPath); + this.includePartialMessages = input.config.includePartialMessages; + this.debug = input.config.debug; + this.debugLogPath = input.config.debugLogPath + ? resolve(input.workspaceRoot, input.config.debugLogPath) + : undefined; + } + + isEnabled(): boolean { + return this.mode !== "off"; + } + + toOptionOverrides(input: { + context: ClaudeTraceContext; + }): Pick { + return { + includePartialMessages: this.includePartialMessages, + debug: this.debug || this.debugLogPath !== undefined, + ...(this.debugLogPath ? { debugFile: this.debugLogPath } : {}), + stderr: (data: string): void => { + this.record({ + stage: "query.stderr", + message: "Claude SDK stderr output.", + context: input.context, + data: { + stderr: sanitizeJsonValue(data), + }, + }); + }, + }; + } + + recordQueryStarted(input: { + context: ClaudeTraceContext; + data?: JsonObject; + }): void { + this.record({ + stage: "query.started", + message: "Claude query started.", + context: input.context, + ...(input.data ? { data: input.data } : {}), + }); + } + + recordMessage(input: { + context: ClaudeTraceContext; + message: SDKMessage; + }): void { + const summarized = summarizeClaudeMessage(input.message, this.verbosity); + this.record({ + stage: "query.message", + message: summarized.summary, + context: input.context, + sdkMessageType: summarized.messageType, + sdkMessageSubtype: summarized.messageSubtype, + sdkSessionId: summarized.sdkSessionId, + ...(summarized.data ? { data: summarized.data } : {}), + }); + } + + recordQueryCompleted(input: { + context: ClaudeTraceContext; + data?: JsonObject; + }): void { + this.record({ + stage: "query.completed", + message: "Claude query completed.", + context: input.context, + ...(input.data ? { data: input.data } : {}), + }); + } + + recordQueryError(input: { + context: ClaudeTraceContext; + error: unknown; + }): void { + const errorMessage = input.error instanceof Error ? input.error.message : String(input.error); + this.record({ + stage: "query.error", + message: "Claude query failed.", + context: input.context, + data: { + error: truncate(errorMessage), + }, + }); + } + + async close(): Promise { + await Promise.all([...this.pendingWrites]); + } + + private record(input: { + stage: ClaudeTraceRecord["stage"]; + message: string; + context: ClaudeTraceContext; + sdkMessageType?: string; + sdkMessageSubtype?: string; + sdkSessionId?: string; + data?: JsonObject; + }): void { + if (!this.isEnabled()) { + return; + } + + const record = toRecord(input); + + if (this.mode === "stdout" || this.mode === "both") { + const stdoutRecord = this.toStdoutRecord(record); + if (stdoutRecord) { + console.log(`[claude-trace] ${JSON.stringify(stdoutRecord)}`); + } + } + + if (this.mode === "file" || this.mode === "both") { + const fileRecord = this.toFileRecord(record); + if (!fileRecord) { + return; + } + const line = JSON.stringify(fileRecord); + const write = mkdir(dirname(this.logPath), { recursive: true }) + .then(() => appendFile(this.logPath, `${line}\n`, "utf8")) + .catch((error: unknown) => { + this.reportFileWriteFailure(error); + }) + .finally(() => { + this.pendingWrites.delete(write); + }); + this.pendingWrites.add(write); + } + } + + private toStdoutRecord(record: ClaudeTraceRecord): ClaudeTraceRecord | undefined { + return this.toFilteredMessageRecord(record, "stdout"); + } + + private toFileRecord(record: ClaudeTraceRecord): ClaudeTraceRecord | undefined { + return this.toFilteredMessageRecord(record, "file"); + } + + private toFilteredMessageRecord( + record: ClaudeTraceRecord, + destination: "stdout" | "file", + ): ClaudeTraceRecord | undefined { + if (record.stage !== "query.message") { + return record; + } + + if (!record.sdkMessageType) { + return record; + } + + if (record.sdkMessageType === "tool_progress") { + return this.toSampledToolProgressRecord(record, destination); + } + + if (record.sdkMessageType === "stream_event") { + if (!this.includePartialMessages) { + return undefined; + } + return this.toSampledStreamEventRecord(record, destination); + } + + if (record.sdkMessageType === "auth_status") { + const data = record.data; + const isAuthenticating = data?.isAuthenticating === true; + const hasError = typeof data?.error === "string" && data.error.trim().length > 0; + if (hasError || !isAuthenticating) { + return record; + } + return undefined; + } + + return record; + } + + private toSampledToolProgressRecord( + record: ClaudeTraceRecord, + destination: "stdout" | "file", + ): ClaudeTraceRecord | undefined { + const now = Date.now(); + const minIntervalMs = destination === "stdout" ? 1000 : 2000; + const rawToolUseId = record.data?.toolUseId; + const toolUseId = typeof rawToolUseId === "string" ? rawToolUseId : "unknown"; + const key = `${destination}:${record.sessionId}:${record.nodeId}:${toolUseId}`; + const progressByKey = destination === "stdout" ? this.stdoutProgressByKey : this.fileProgressByKey; + const state = progressByKey.get(key); + + if (!state) { + progressByKey.set(key, { + lastEmittedAt: now, + suppressed: 0, + }); + return record; + } + + if (now - state.lastEmittedAt < minIntervalMs) { + state.suppressed += 1; + return undefined; + } + + state.lastEmittedAt = now; + const suppressed = state.suppressed; + state.suppressed = 0; + + if (suppressed < 1) { + return record; + } + + const nextData: JsonObject = { + ...(record.data ?? {}), + suppressedSinceLastEmit: suppressed, + }; + + return { + ...record, + data: nextData, + }; + } + + private toSampledStreamEventRecord( + record: ClaudeTraceRecord, + destination: "stdout" | "file", + ): ClaudeTraceRecord | undefined { + const now = Date.now(); + const minIntervalMs = destination === "stdout" ? 700 : 1200; + const key = `${destination}:${record.sessionId}:${record.nodeId}:stream`; + const streamByKey = destination === "stdout" ? this.stdoutStreamByKey : this.fileStreamByKey; + const state = streamByKey.get(key); + + if (!state) { + streamByKey.set(key, { + lastEmittedAt: now, + suppressed: 0, + }); + return record; + } + + if (now - state.lastEmittedAt < minIntervalMs) { + state.suppressed += 1; + return undefined; + } + + state.lastEmittedAt = now; + const suppressed = state.suppressed; + state.suppressed = 0; + + if (suppressed < 1) { + return record; + } + + const nextData: JsonObject = { + ...(record.data ?? {}), + suppressedStreamEventsSinceLastEmit: suppressed, + }; + + return { + ...record, + data: nextData, + }; + } + + private reportFileWriteFailure(error: unknown): void { + this.fileWriteFailureCount += 1; + if (this.fileWriteFailureCount <= 5) { + const message = error instanceof Error ? error.message : String(error); + console.warn( + `[claude-trace] failed to append trace log to ${this.logPath}: ${truncate(message, 180)}`, + ); + return; + } + + if (this.fileWriteFailureCount === 6) { + console.warn("[claude-trace] additional trace-log write failures suppressed."); + } + } +} diff --git a/src/ui/claude-trace-store.ts b/src/ui/claude-trace-store.ts new file mode 100644 index 0000000..7ff0bd9 --- /dev/null +++ b/src/ui/claude-trace-store.ts @@ -0,0 +1,85 @@ +import { readFile } from "node:fs/promises"; +import { resolve } from "node:path"; + +export type ClaudeTraceEvent = { + timestamp: string; + message: string; + stage?: string; + sessionId?: string; + sdkMessageType?: string; + sdkMessageSubtype?: string; + data?: unknown; +} & Record; + +type ClaudeTraceFilter = { + sessionId?: string; + limit?: number; +}; + +function safeParseLine(line: string): ClaudeTraceEvent | undefined { + const trimmed = line.trim(); + if (!trimmed) { + return undefined; + } + + try { + const parsed = JSON.parse(trimmed) as unknown; + if (!parsed || typeof parsed !== "object") { + return undefined; + } + + const record = parsed as Record; + if (typeof record.timestamp !== "string" || typeof record.message !== "string") { + return undefined; + } + + return record as ClaudeTraceEvent; + } catch { + return undefined; + } +} + +export async function readClaudeTraceEvents(logPath: string): Promise { + const absolutePath = resolve(logPath); + let content = ""; + + try { + content = await readFile(absolutePath, "utf8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return []; + } + throw error; + } + + const parsed: ClaudeTraceEvent[] = []; + for (const line of content.split(/\r?\n/)) { + const event = safeParseLine(line); + if (event) { + parsed.push(event); + } + } + + parsed.sort((left, right) => left.timestamp.localeCompare(right.timestamp)); + return parsed; +} + +export function filterClaudeTraceEvents( + events: readonly ClaudeTraceEvent[], + filter: ClaudeTraceFilter, +): ClaudeTraceEvent[] { + const filtered: ClaudeTraceEvent[] = []; + + for (const event of events) { + if (filter.sessionId && event.sessionId !== filter.sessionId) { + continue; + } + filtered.push(event); + } + + if (!filter.limit || filter.limit < 1 || filtered.length <= filter.limit) { + return filtered; + } + + return filtered.slice(-filter.limit); +} diff --git a/src/ui/public/app.js b/src/ui/public/app.js index 73d38a2..009e21b 100644 --- a/src/ui/public/app.js +++ b/src/ui/public/app.js @@ -2,6 +2,7 @@ const state = { config: null, manifests: [], sessions: [], + sessionMetadata: [], runs: [], selectedSessionId: "", selectedManifestPath: "", @@ -25,13 +26,22 @@ const dom = { runProvider: document.querySelector("#run-provider"), runTopologyHint: document.querySelector("#run-topology-hint"), runFlags: document.querySelector("#run-flags"), + runRuntimeContext: document.querySelector("#run-runtime-context"), runValidationNodes: document.querySelector("#run-validation-nodes"), killRun: document.querySelector("#kill-run"), runStatus: document.querySelector("#run-status"), + sessionForm: document.querySelector("#session-form"), + sessionProjectPath: document.querySelector("#session-project-path"), + sessionCreate: document.querySelector("#session-create"), + sessionClose: document.querySelector("#session-close"), + sessionCloseMerge: document.querySelector("#session-close-merge"), nodeInspector: document.querySelector("#node-inspector"), eventsLimit: document.querySelector("#events-limit"), eventsRefresh: document.querySelector("#events-refresh"), eventFeed: document.querySelector("#event-feed"), + claudeEventsLimit: document.querySelector("#claude-events-limit"), + claudeEventsRefresh: document.querySelector("#claude-events-refresh"), + claudeEventFeed: document.querySelector("#claude-event-feed"), historyRefresh: document.querySelector("#history-refresh"), historyBody: document.querySelector("#history-body"), notificationsForm: document.querySelector("#notifications-form"), @@ -77,6 +87,7 @@ const dom = { cfgTopologyDepth: document.querySelector("#cfg-topology-depth"), cfgTopologyRetries: document.querySelector("#cfg-topology-retries"), cfgRelationshipChildren: document.querySelector("#cfg-relationship-children"), + cfgMergeConflictAttempts: document.querySelector("#cfg-merge-conflict-attempts"), cfgPortBase: document.querySelector("#cfg-port-base"), cfgPortBlockSize: document.querySelector("#cfg-port-block-size"), cfgPortBlockCount: document.querySelector("#cfg-port-block-count"), @@ -111,10 +122,15 @@ const MANIFEST_EVENT_TRIGGERS = [ "requirements_defined", "tasks_planned", "code_committed", + "task_ready_for_review", "task_blocked", "validation_passed", "validation_failed", "branch_merged", + "merge_conflict_detected", + "merge_conflict_resolved", + "merge_conflict_unresolved", + "merge_retry_started", ]; const RUN_MANIFEST_EDITOR_VALUE = "__editor__"; @@ -129,8 +145,12 @@ const LABEL_HELP_BY_CONTROL = Object.freeze({ "run-provider": "Choose which model provider backend handles provider-mode runs.", "run-topology-hint": "Optional hint that nudges orchestration toward a topology strategy.", "run-flags": "Optional JSON object passed in as initial run flags.", + "run-runtime-context": "Optional JSON object of template values injected into persona prompts (for example repo or ticket).", "run-validation-nodes": "Optional comma-separated node IDs to simulate validation outcomes for.", + "session-project-path": "Absolute project path used when creating an explicit managed session.", + "session-close-merge": "When enabled, close will merge the session base branch back into the project branch.", "events-limit": "Set how many recent runtime events are loaded per refresh.", + "claude-events-limit": "Set how many Claude SDK trace records are loaded per refresh.", "cfg-webhook-url": "Webhook endpoint that receives runtime event notifications.", "cfg-webhook-severity": "Minimum severity level that triggers webhook notifications.", "cfg-webhook-always": "Event types that should always notify, regardless of severity.", @@ -145,6 +165,7 @@ const LABEL_HELP_BY_CONTROL = Object.freeze({ "cfg-topology-depth": "Maximum orchestration graph depth permitted by topology rules.", "cfg-topology-retries": "Maximum retry expansions allowed by topology orchestration.", "cfg-relationship-children": "Maximum children each persona relationship can spawn.", + "cfg-merge-conflict-attempts": "Maximum merge-conflict resolution attempts before emitting unresolved conflict events.", "cfg-port-base": "Starting port number for provisioning port allocations.", "cfg-port-block-size": "Number of ports reserved per allocated block.", "cfg-port-block-count": "Number of port blocks available for allocation.", @@ -1029,6 +1050,7 @@ async function loadConfig() { dom.cfgTopologyDepth.value = String(limits.topologyMaxDepth); dom.cfgTopologyRetries.value = String(limits.topologyMaxRetries); dom.cfgRelationshipChildren.value = String(limits.relationshipMaxChildren); + dom.cfgMergeConflictAttempts.value = String(limits.mergeConflictMaxAttempts); dom.cfgPortBase.value = String(limits.portBase); dom.cfgPortBlockSize.value = String(limits.portBlockSize); dom.cfgPortBlockCount.value = String(limits.portBlockCount); @@ -1060,11 +1082,28 @@ function statusChipClass(status) { return `status-chip status-${status || "unknown"}`; } +function getSessionLifecycleStatus(sessionId) { + const metadata = state.sessionMetadata.find((entry) => entry?.sessionId === sessionId); + if (!metadata) { + return undefined; + } + + const status = metadata.sessionStatus; + if (status === "active" || status === "suspended" || status === "closed" || status === "closed_with_conflicts") { + return status; + } + return undefined; +} + function renderRunsAndSessionsTable() { const rows = []; for (const session of state.sessions) { - const sessionStatus = session.status || "unknown"; + const lifecycleStatus = getSessionLifecycleStatus(session.sessionId); + const sessionStatus = + lifecycleStatus === "closed" || lifecycleStatus === "closed_with_conflicts" + ? lifecycleStatus + : session.status || lifecycleStatus || "unknown"; rows.push(` ${escapeHtml(session.sessionId)} @@ -1092,6 +1131,7 @@ function renderRunsAndSessionsTable() { async function loadSessions() { const payload = await apiRequest("/api/sessions"); state.sessions = payload.sessions || []; + state.sessionMetadata = payload.sessionMetadata || []; state.runs = payload.runs || []; if (!state.selectedSessionId && state.sessions.length > 0) { @@ -1457,6 +1497,43 @@ function renderEventFeed(events) { dom.eventFeed.innerHTML = rows || '
-
-
No runtime events.
'; } +function toClaudeRowSeverity(event) { + const stage = String(event?.stage || ""); + const type = String(event?.sdkMessageType || ""); + if (stage === "query.error") { + return "critical"; + } + if (stage === "query.stderr" || (type === "result" && String(event?.sdkMessageSubtype || "").startsWith("error_"))) { + return "warning"; + } + return "info"; +} + +function renderClaudeTraceFeed(events) { + const rows = [...events] + .reverse() + .map((event) => { + const ts = new Date(event.timestamp).toLocaleTimeString(); + const stage = String(event.stage || "query.message"); + const sdkMessageType = String(event.sdkMessageType || ""); + const sdkMessageSubtype = String(event.sdkMessageSubtype || ""); + const typeLabel = sdkMessageType + ? `${stage}/${sdkMessageType}${sdkMessageSubtype ? `:${sdkMessageSubtype}` : ""}` + : stage; + const message = typeof event.message === "string" ? event.message : JSON.stringify(event.message || ""); + return ` +
+
${escapeHtml(ts)}
+
${escapeHtml(typeLabel)}
+
${escapeHtml(message)}
+
+ `; + }) + .join(""); + + dom.claudeEventFeed.innerHTML = rows || '
-
-
No Claude trace events.
'; +} + async function refreshEvents() { const limit = Number(dom.eventsLimit.value || "150"); const params = new URLSearchParams({ @@ -1471,6 +1548,20 @@ async function refreshEvents() { renderEventFeed(payload.events || []); } +async function refreshClaudeTrace() { + const limit = Number(dom.claudeEventsLimit.value || "150"); + const params = new URLSearchParams({ + limit: String(limit), + }); + + if (state.selectedSessionId) { + params.set("sessionId", state.selectedSessionId); + } + + const payload = await apiRequest(`/api/claude-trace?${params.toString()}`); + renderClaudeTraceFeed(payload.events || []); +} + async function startRun(event) { event.preventDefault(); @@ -1486,6 +1577,12 @@ async function startRun(event) { return; } + const runtimeContext = parseJsonSafe(dom.runRuntimeContext.value, {}); + if (typeof runtimeContext !== "object" || Array.isArray(runtimeContext) || !runtimeContext) { + showRunStatus("Runtime Context Overrides must be a JSON object.", true); + return; + } + const manifestSelection = dom.runManifestSelect.value.trim(); const payload = { @@ -1494,9 +1591,21 @@ async function startRun(event) { provider: dom.runProvider.value, topologyHint: dom.runTopologyHint.value.trim() || undefined, initialFlags: flags, + runtimeContextOverrides: runtimeContext, simulateValidationNodeIds: fromCsv(dom.runValidationNodes.value), }; + const selectedSessionMetadata = state.sessionMetadata.find( + (entry) => entry?.sessionId === state.selectedSessionId, + ); + if ( + selectedSessionMetadata && + (selectedSessionMetadata.sessionStatus === "active" || + selectedSessionMetadata.sessionStatus === "suspended") + ) { + payload.sessionId = selectedSessionMetadata.sessionId; + } + if (manifestSelection === RUN_MANIFEST_EDITOR_VALUE) { const manifestFromEditor = parseJsonSafe(dom.manifestEditor.value, null); if (!manifestFromEditor) { @@ -1527,6 +1636,7 @@ async function startRun(event) { dom.sessionSelect.value = run.sessionId; await refreshGraph(); await refreshEvents(); + await refreshClaudeTrace(); } catch (error) { showRunStatus(error instanceof Error ? error.message : String(error), true); } @@ -1547,6 +1657,67 @@ async function cancelActiveRun() { await loadSessions(); await refreshGraph(); await refreshEvents(); + await refreshClaudeTrace(); + } catch (error) { + showRunStatus(error instanceof Error ? error.message : String(error), true); + } +} + +async function createSessionFromUi() { + const projectPath = dom.sessionProjectPath.value.trim(); + if (!projectPath) { + showRunStatus("Project path is required to create a session.", true); + return; + } + + try { + const payload = await apiRequest("/api/sessions", { + method: "POST", + body: JSON.stringify({ + projectPath, + }), + }); + + const created = payload.session; + if (created?.sessionId) { + state.selectedSessionId = created.sessionId; + showRunStatus(`Session ${created.sessionId} created.`); + } else { + showRunStatus("Session created."); + } + await loadSessions(); + if (state.selectedSessionId) { + dom.sessionSelect.value = state.selectedSessionId; + await refreshGraph(); + await refreshEvents(); + await refreshClaudeTrace(); + } + } catch (error) { + showRunStatus(error instanceof Error ? error.message : String(error), true); + } +} + +async function closeSelectedSessionFromUi() { + const sessionId = state.selectedSessionId || dom.sessionSelect.value; + if (!sessionId) { + showRunStatus("Select a session before closing.", true); + return; + } + + try { + const payload = await apiRequest(`/api/sessions/${encodeURIComponent(sessionId)}/close`, { + method: "POST", + body: JSON.stringify({ + mergeToProject: dom.sessionCloseMerge.checked, + }), + }); + + const nextStatus = payload?.session?.sessionStatus || "closed"; + showRunStatus(`Session ${sessionId} closed with status ${nextStatus}.`); + await loadSessions(); + await refreshGraph(); + await refreshEvents(); + await refreshClaudeTrace(); } catch (error) { showRunStatus(error instanceof Error ? error.message : String(error), true); } @@ -1597,6 +1768,7 @@ async function saveLimits(event) { topologyMaxDepth: Number(dom.cfgTopologyDepth.value), topologyMaxRetries: Number(dom.cfgTopologyRetries.value), relationshipMaxChildren: Number(dom.cfgRelationshipChildren.value), + mergeConflictMaxAttempts: Number(dom.cfgMergeConflictAttempts.value), portBase: Number(dom.cfgPortBase.value), portBlockSize: Number(dom.cfgPortBlockSize.value), portBlockCount: Number(dom.cfgPortBlockCount.value), @@ -1695,6 +1867,7 @@ function bindUiEvents() { state.selectedSessionId = dom.sessionSelect.value; await refreshGraph(); await refreshEvents(); + await refreshClaudeTrace(); }); dom.graphManifestSelect.addEventListener("change", async () => { @@ -1714,15 +1887,26 @@ function bindUiEvents() { await refreshEvents(); }); + dom.claudeEventsRefresh.addEventListener("click", async () => { + await refreshClaudeTrace(); + }); + dom.historyRefresh.addEventListener("click", async () => { await loadSessions(); await refreshGraph(); + await refreshClaudeTrace(); }); dom.runForm.addEventListener("submit", startRun); dom.killRun.addEventListener("click", () => { void cancelActiveRun(); }); + dom.sessionCreate.addEventListener("click", () => { + void createSessionFromUi(); + }); + dom.sessionClose.addEventListener("click", () => { + void closeSelectedSessionFromUi(); + }); dom.notificationsForm.addEventListener("submit", (event) => { void saveNotifications(event); @@ -1830,6 +2014,7 @@ async function refreshAll() { await refreshGraph(); await refreshEvents(); + await refreshClaudeTrace(); } async function initialize() { @@ -1860,6 +2045,10 @@ async function initialize() { void refreshEvents(); }, 3000); + setInterval(() => { + void refreshClaudeTrace(); + }, 3000); + setInterval(() => { void refreshGraph(); }, 7000); diff --git a/src/ui/public/index.html b/src/ui/public/index.html index e27a6cf..23693e2 100644 --- a/src/ui/public/index.html +++ b/src/ui/public/index.html @@ -75,6 +75,10 @@ Initial Flags (JSON) +