8 Commits

40 changed files with 4792 additions and 180 deletions

View File

@@ -16,6 +16,15 @@ CLAUDE_CODE_OAUTH_TOKEN=
ANTHROPIC_API_KEY=
CLAUDE_MODEL=
CLAUDE_CODE_PATH=
# Claude binary observability: off | stdout | file | both
CLAUDE_OBSERVABILITY_MODE=off
# CLAUDE_OBSERVABILITY_VERBOSITY: summary | full
CLAUDE_OBSERVABILITY_VERBOSITY=summary
# Relative to repository workspace root in UI/provider runs.
CLAUDE_OBSERVABILITY_LOG_PATH=.ai_ops/events/claude-trace.ndjson
CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL=false
CLAUDE_OBSERVABILITY_DEBUG=false
CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH=
# Agent management limits
AGENT_MAX_CONCURRENT=4
@@ -28,6 +37,7 @@ AGENT_PROJECT_CONTEXT_PATH=.ai_ops/project-context.json
AGENT_TOPOLOGY_MAX_DEPTH=4
AGENT_TOPOLOGY_MAX_RETRIES=2
AGENT_RELATIONSHIP_MAX_CHILDREN=4
AGENT_MERGE_CONFLICT_MAX_ATTEMPTS=2
# Resource provisioning (hard + soft constraints)
AGENT_WORKTREE_ROOT=.ai_ops/worktrees

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@ dist
mcp.config.json
.ai_ops
.agent-context
.workspace

View File

@@ -29,6 +29,7 @@
- `AGENT_TOPOLOGY_MAX_DEPTH`
- `AGENT_TOPOLOGY_MAX_RETRIES`
- `AGENT_RELATIONSHIP_MAX_CHILDREN`
- `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS`
- Provisioning/resource controls:
- `AGENT_WORKTREE_ROOT`
- `AGENT_WORKTREE_BASE_REF`

View File

@@ -20,10 +20,13 @@ TypeScript runtime for deterministic multi-agent execution with:
- Runtime events are emitted as best-effort side-channel telemetry and do not affect orchestration control flow.
- `AgentManager` is an internal utility used by the pipeline when fan-out/retry-unrolled behavior is required.
- Session state is persisted under `AGENT_STATE_ROOT`.
- Project state is persisted under `AGENT_PROJECT_CONTEXT_PATH` with schema-versioned JSON (`schemaVersion`) and domains:
- Session lifecycle is explicit (`POST /api/sessions`, `POST /api/sessions/:id/run`, `POST /api/sessions/:id/close`) and each session is bound to a target project path.
- Session project context is persisted as schema-versioned JSON (`schemaVersion`) with domains:
- `globalFlags`
- `artifactPointers`
- `taskQueue`
- each task record stores `taskId`, status, and optional `worktreePath` for task-scoped workspace ownership
- conflict-aware statuses are supported (`conflict`, `resolving_conflict`)
## Deep Dives
@@ -92,11 +95,13 @@ The UI provides:
- graph visualizer with topology/retry rendering, edge trigger labels, node economics (duration/cost/tokens), and critical-path highlighting
- node inspector with attempt metadata and injected `ResolvedExecutionContext` sandbox payload
- live runtime event feed from `AGENT_RUNTIME_EVENT_LOG_PATH` with severity coloring (including security mirror events)
- Claude trace feed from `CLAUDE_OBSERVABILITY_LOG_PATH` (query lifecycle, SDK message types/subtypes, and errors)
- run trigger + kill switch backed by `SchemaDrivenExecutionEngine.runSession(...)`
- run mode selector: `provider` (real Codex/Claude execution) or `mock` (deterministic dry-run executor)
- provider selector: `codex` or `claude`
- run history from `AGENT_STATE_ROOT`
- forms for runtime Discord webhook settings, security policy, and manager/resource limits
- hover help on form labels with short intent guidance for each field
- manifest editor/validator/saver for schema `"1"` manifests
Provider mode notes:
@@ -104,6 +109,7 @@ Provider mode notes:
- `provider=codex` uses existing OpenAI/Codex auth settings (`OPENAI_AUTH_MODE`, `CODEX_API_KEY`, `OPENAI_API_KEY`).
- `provider=claude` uses Claude auth resolution (`CLAUDE_CODE_OAUTH_TOKEN` preferred, otherwise `ANTHROPIC_API_KEY`, or existing Claude Code login state).
- `CLAUDE_MODEL` should be a Claude model id/alias recognized by Claude Code (for example `claude-sonnet-4-6`); `anthropic/...` prefixes are normalized automatically.
- Claude provider runs can emit Claude SDK/CLI internals to stdout and/or NDJSON with `CLAUDE_OBSERVABILITY_*` settings.
## Manifest Semantics
@@ -127,9 +133,9 @@ Pipeline edges can route via:
Domain events are typed and can trigger edges directly:
- planning: `requirements_defined`, `tasks_planned`
- execution: `code_committed`, `task_blocked`
- execution: `code_committed`, `task_ready_for_review`, `task_blocked`
- validation: `validation_passed`, `validation_failed`
- integration: `branch_merged`
- integration: `branch_merged`, `merge_conflict_detected`, `merge_conflict_resolved`, `merge_conflict_unresolved`, `merge_retry_started`
Actors can emit events in `ActorExecutionResult.events`. Pipeline status also emits default validation/execution events.
@@ -198,6 +204,30 @@ Notes:
- `security.tool.invocation_allowed`
- `security.tool.invocation_blocked`
## Claude Observability
- `CLAUDE_OBSERVABILITY_MODE=stdout` prints structured Claude query internals (tool progress, system events, stderr, result lifecycle) to stdout as JSON lines prefixed with `[claude-trace]`.
- `CLAUDE_OBSERVABILITY_MODE=file` appends the same records to `CLAUDE_OBSERVABILITY_LOG_PATH`.
- `CLAUDE_OBSERVABILITY_MODE=both` enables both outputs.
- Output samples high-frequency `tool_progress` events to avoid log flooding while retaining suppression counters.
- `assistant` and `user` message records are retained so turn flow is inspectable end-to-end.
- `CLAUDE_OBSERVABILITY_VERBOSITY=summary` stores compact metadata; `full` stores redacted full SDK message payloads.
- `CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL=true` enables and emits sampled partial assistant stream events from the SDK.
- `CLAUDE_OBSERVABILITY_DEBUG=true` enables Claude SDK debug mode.
- `CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH` writes Claude SDK debug output to a file (also enables debug mode).
- In UI/provider mode, `CLAUDE_OBSERVABILITY_LOG_PATH` resolves relative to the repo workspace root.
- UI API: `GET /api/claude-trace?limit=<n>&sessionId=<id>` reads filtered Claude trace records.
Example:
```bash
CLAUDE_OBSERVABILITY_MODE=both
CLAUDE_OBSERVABILITY_VERBOSITY=summary
CLAUDE_OBSERVABILITY_LOG_PATH=.ai_ops/events/claude-trace.ndjson
CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL=false
CLAUDE_OBSERVABILITY_DEBUG=false
```
### Analytics Quick Start
Inspect latest events:
@@ -237,6 +267,7 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson
- Every actor execution input now includes `security` helpers (`rulesEngine`, `createCommandExecutor(...)`) so executors can enforce shell/tool policy at the execution boundary.
- Every actor execution input now includes `mcp` helpers (`resolvedConfig`, `resolveConfig(...)`, `filterToolsForProvider(...)`, `createClaudeCanUseTool()`) so provider adapters are filtered against `executionContext.allowedTools` before SDK calls.
- For Claude-based executors, pass `input.mcp.filterToolsForProvider(...)` and `input.mcp.createClaudeCanUseTool()` into the SDK call path so unauthorized tools are never exposed and runtime bypass attempts trigger security violations.
- Claude `canUseTool` permission checks normalize provider casing (`Bash` vs `bash`) before enforcing persona allowlists.
- Pipeline behavior on `SecurityViolationError` is configurable:
- `hard_abort` (default)
- `validation_fail` (retry-unrolled remediation)
@@ -254,6 +285,12 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson
- `ANTHROPIC_API_KEY` (used when `CLAUDE_CODE_OAUTH_TOKEN` is unset)
- `CLAUDE_MODEL`
- `CLAUDE_CODE_PATH`
- `CLAUDE_OBSERVABILITY_MODE` (`off`, `stdout`, `file`, or `both`)
- `CLAUDE_OBSERVABILITY_VERBOSITY` (`summary` or `full`)
- `CLAUDE_OBSERVABILITY_LOG_PATH`
- `CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL` (`true` or `false`)
- `CLAUDE_OBSERVABILITY_DEBUG` (`true` or `false`)
- `CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH`
- `MCP_CONFIG_PATH`
### Agent Manager Limits
@@ -269,6 +306,7 @@ jq -c 'select(.severity=="critical")' .ai_ops/events/runtime-events.ndjson
- `AGENT_TOPOLOGY_MAX_DEPTH`
- `AGENT_TOPOLOGY_MAX_RETRIES`
- `AGENT_RELATIONSHIP_MAX_CHILDREN`
- `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS`
### Provisioning / Resource Controls

View File

@@ -52,10 +52,16 @@ This keeps orchestration policy resolution separate from executor enforcement. E
- planning: `requirements_defined`, `tasks_planned`
- execution: `code_committed`, `task_blocked`
- validation: `validation_passed`, `validation_failed`
- integration: `branch_merged`
- integration: `branch_merged`, `merge_conflict_detected`, `merge_conflict_resolved`, `merge_conflict_unresolved`, `merge_retry_started`
- Pipeline edges can trigger on domain events (`edge.event`) in addition to legacy status triggers (`edge.on`).
- `history_has_event` route conditions evaluate persisted domain event history entries (`validation_failed`, `task_blocked`, etc.).
## Merge conflict orchestration
- Task merge/close merge operations return structured outcomes (`success`, `conflict`, `fatal_error`) instead of throwing for conflicts.
- Task state supports conflict workflows (`conflict`, `resolving_conflict`) and conflict metadata is persisted under `task.metadata.mergeConflict`.
- Conflict retries are bounded by `AGENT_MERGE_CONFLICT_MAX_ATTEMPTS`; exhaustion emits `merge_conflict_unresolved` and the session continues without crashing.
## Security note
Security enforcement now lives in `src/security`:

View File

@@ -40,6 +40,7 @@ This middleware provides a first-pass hardening layer for agent-executed shell c
- `registry`: resolved runtime `McpRegistry`
- `resolveConfig(...)`: centralized MCP config resolution with persona tool-clearance applied
- `createClaudeCanUseTool()`: helper for Claude SDK `canUseTool` callback so each tool invocation is allowlist/banlist-enforced before execution
- Tool matching is case-insensitive at invocation time to handle provider-emitted names like `Bash` versus allowlist entries like `bash`.
## Known limits and TODOs

View File

@@ -5,8 +5,147 @@
- can use any openai/anthropic models
- can use multiple sets of creds
# in progress
# Scheduled
# other scheduled
- persona definitions
- product
- task
- coder
- tester
- git
- handle basic git validation/maintenance
- edit + merge when conflict is low
- pass to dev when conflict is big
- task management flow outline
- what is hard coded?
- anything that isnt 100% reliant on an llm
- complete task, next task, etc
- task dependency graph aka the next task to be assigned is x
- giga do not ever let the agent call something like this, ban it if you can
- task assignment
- task init
-
- what is sent to llm?
- the minimum possible relevant data
- task prioritization
- subtask explosion
- "clarification needed" process
- init
- planning
- prioritization
- dependency graph
- subtasks
- task/subtask status updates (pending, in progress, done, failed)
- remove todoist mcp
# Considering
- adding a similar testing methodology to the python script with playwright and visual automation + banning them from hacking it with curl and whatnot
- add instruction to tell the agent under which circumstances it should consider using context7 and if it decides to use it how it should write code to interact with it rather than calling it directly
- pretty colors on terminal uwu
- agent names
- consider adding google gemini 3.1, even though it costs money it is the best prd drafter by far. might be good at tasks too
- list/select models
- selection per task/session/agent
- git orchestration
- merging
- symlinks
# consider adding these libs
1. The Control Flow: Keep Yours, But Upgrade the Math
Right now, your PipelineExecutor (Feedback #5) is handling scheduling and topological fan-out manually. This is where homegrown DAGs usually start breaking down as relationships get complex.
What the big frameworks use: They rely on established graph theory libraries to handle the execution order.
What you should adopt: Do not write your own DAG traversal logic. Bring in a lightweight library like graphlib (or a modern TypeScript equivalent) to handle the topological sorting.
2. . Tooling and Transport: The MCP SDK
You already have a src/mcp directory, which puts you ahead of the curve. But managing the low-level JSON-RPC protocol over stdio or Server-Sent Events (SSE) is notoriously fragile.
What the big frameworks use: The official @modelcontextprotocol/sdk packages provided by Anthropic.
What you should adopt: If you aren't already, replace your custom src/mcp/converters.ts logic with the official SDK. Relying on the official standard ensures your orchestrator isn't permanently hard-coupled to your current Anthropic and OpenAI subscriptions. If you decide to point this engine at your local Ollama instance running behind Traefik, a standardized MCP transport layer guarantees your tools and context will work seamlessly across both your cloud models and your local open-weight ones.
3. Process Execution: Safer Shells
When your agents execute shell commands in that AGENT_WORKTREE_ROOT, using Node's raw child_process.exec is messy. It buffers stdout/stderr poorly and makes escaping arguments dangerous.
What the big frameworks use: zx (by Google) or execa.
What you should adopt: execa is fantastic for this. It handles process timeouts, cleans up orphaned child processes automatically (crucial for your retry-unrolled DAGs), and streams stdout natively so you can pipe it directly into your domain-events bus without memory bloat.
# Completed
1. boilerplate typescript project for claude
- mcp server support
- generic mcp handlers
- specific mcp handlers for
- context7
- claude task manager
- concurrency, configurable max agent and max depth
- Extensible Resource Provisioning
- hard constraints
- soft constraints
- basic hygeine run
# epic
- agent orchestration system improvements
# module 1
- schema driven execution engine
- specific definitions handled in AgentManifest schema
- persona registry
- templated system prompts injected with runtime context
- tool clearances (stub this for now, add TODO for security implementation)
- allowlist
- banlist
- behavioral event handlers
- define how personas react to specific events ie. onTaskComplete, onValidationFail
# module 2
- actor oriented pipeline constrained by a strict directed acyclic graph
- relationship + pipeline graphs
- multi level topology
- hierarchical ie parent spawns 3 coder children
- unrolled retry pipelines ie coder1 > QA1 > Coder2 > QA2
- sequential ie product > task > coder > QA > git
- support for constraint definition for each concept (relationship, pipeline, topology)
- ie max depth, max retries
- state dependent routings
- support branching logic based on project history or repository state ie. project init requires product agent to generate prd, then task agent needs to create roadmap, once those exist future sessions skip those agents and go straight to coder agents
# module 3
- state/context manager
- stateless handoffs
- state and context are passed forwards through payloads via worktree/storage, not conversational memory
- fresh context per node execution
# module 4
- resource provisioning
- hierarchical resource suballocation
- when a parent agent spawns children, handle local resource management
- branche/sub-worktree provisioning
- suballocating deterministic port range provisioning
- extensibility to support future resource types
# epic
implementation of AgentManager.runRecursiveAgent
@@ -68,109 +207,352 @@ implementation of AgentManager.runRecursiveAgent
- The Abort Test: Start a parent with a 5-second sleep task, cancel the session at 1 second. Assert that the underlying LLM SDK handles were aborted and resources were released.
- The Isolation Test: Spawn two children concurrently. Assert they are assigned non-overlapping port ranges and isolated worktree paths.
# Scheduled
- security implementation
- persona definitions
- product
- task
- coder
- tester
- git
- handle basic git validation/maintenance
- edit + merge when conflict is low
- pass to dev when conflict is big
# epic
- need to untangle
- what goes where in terms of DAG definition vs app logic vs agent behavior
- events
- what events do we have
- what personas care about what events
- how should a persona respond to an event
- where is this defined
- success/failure/retry policy definitions
- where does this go?
- what are they?
# connecting pipeline engine and recursive agent management
- The Pipeline Engine must be the single source of truth. The Recursive Manager should not be a separate way to run agents; it should be a Utility that the Pipeline Engine calls when it hits a node that requires a "Fan-out" (hierarchical or unrolled-retry topology).
- deprecate the standalone CLI examples for runRecursiveAgent and instead wire the Manager directly inside SchemaDrivenExecutionEngine.runSession
# execution driven topologies
- currently, manifest validates that a topology is, for example, "hierarchical." But at runtime, the code just ignores that label and runs everything sequentially
- The execution loop needs a true DAG Runner
- When the Orchestrator evaluates the next nodes to run, if it sees a "hierarchical" or "parallel" topology block, it must dispatch those nodes to the AgentManager concurrently using Promise.all(), rather than waiting for one to finish before starting the next
# project scoped data store
- implement a ProjectContext store. Sessions should read from the global Project State on initialization, and write their metadata updates back to the Project State upon successful termination
- store should contain these domains
- global flags
- artifact pointers
- task queue
- dag orchestrator reads file at init and writes to it upon node completion
# typed domain event bus
- Implement a strongly-typed Event Bus or a Domain Event schema.
- Create a standard payload shape for events
- The Pipeline should allow edges to trigger based on specific domain events, not just basic success/fail strings
- planning events - These events occur when the project state is empty or a new major feature is requested. They transition the system from "idea" to "actionable work."
- requirements defined
- product agent triggers upon prd completion > task agent consumes
- tasks planned
- task agent triggers upon completion of dedicated claude-task-manager process > coder agent consumes
- execution events - These are the most common events. They handle the messy reality of writing code and the cyclical (but unrolled) retry pipelines.
- code committed
- task blocked (needs clarification, impossible task, max retry etc)
- validation events - These events dictate whether the DAG moves forward to integration or branches sideways into a retry pipeline.
- validation passed
- validation failed
- integration events - This event closes the loop and updates the global state.
- branch merged (also tasks updated etc)
# retry matrix and cancellation
- Implement a Status Retry Matrix and enforce AbortSignal everywhere
- Validation_Fail: Trigger the unrolled retry pipeline (send the error back to a new agent instance)
- Hard_Failure (>=2 sequential API timeouts, network drops, 403, etc): Fail fast, do not burn tokens retrying. Bubble the error up to the user
- Pass standard AbortSignal objects down into the ActorExecutionInput so the pipeline can instantly kill rogue processes.
# code review epic
- Header alias inconsistency can break Claude MCP auth/config
- Normalize the config object immediately upon parsing in src/mcp/converters.ts, mapping both headers and http_headers to a single internal representation before either the Codex or Claude handlers touch them
- Update src/agents/pipeline.ts to compute an aggregate status. You should traverse the execution records and ensure all terminal nodes (leaves) in your DAG have a status of "success". If any node in the critical path fails, the whole session should be marked as a failure
- file persistence is not atomic, project-context serialization is process-local only
- Implement atomic writes
- Direct writes in state/context: src/agents/state-context.ts:203, src/agents/state-context.ts:251, src/agents/project-
context.ts:171, src/agents/project-context.ts:205.
- Queue in FileSystemProjectContextStore (src/agents/project-context.ts:145) protects only within one process.
- pipeline executor owns too many responsibilities
- Start extracting distinct policies.
- Move failure classification (hard vs soft fails) into a dedicated FailurePolicy class.
- Move persistence and event emissions into a LifecycleObserver or event bus listener rather than keeping them hardcoded in the execution loop
- Global mutable MCP handler registry limits extensibility/test isolation
- Refactor the registry into an instantiable class (e.g., McpRegistry)
- Pass this instance into your SchemaDrivenExecutionEngine and PipelineExecutor via dependency injection instead of relying on auto-installing imports
- Provider example entrypoints duplicate orchestration pattern
- Create a unified helper like createSessionContext(provider, config) that handles the provisioning, probing, and prompting loop, keeping the provider-specific code strictly limited to model initialization
- Config/env parsing is duplicated
- Create a single src/config.ts (or dedicated config service) that parses process.env, validates it, applies defaults, and freezes the object.
- Inject this single source of truth throughout the app
- Project context parsing is strict
- Update src/agents/project-context.ts:106 to merge parsed files with a set of default root keys
- Add a schemaVersion field to the JSON structure to allow for safe migrations later
# security middleware
- rely on an established AST (Abstract Syntax Tree) parser for shell scripts like bash-parser to handle tokenization
- Use an off-the-shelf parser to break commands down into executable binaries, flags, arguments, and environment variable assignments. We can scrub or inject specific environment variables securely at this layer
- focus specifically on extracting Command and Word nodes from the bash-parser output
- gives us a head start on exactly what part of the syntax tree matters for the allowlist
- AI agents frequently chain commands (&&, ||, |, >) to save turns. If your parser struggles with complex pipelines or subshells, it will artificially cripple the agents' ability to work efficiently
- rules engine
- For the simplest iteration, defining your allowlists and tool clearance schema via strictly typed Zod schemas is the most lightweight approach. You validate the AST output against the schema before passing it to the execution layer
- Implement strict binary allowlists (e.g., git, npm, node, cat) and enforce directory-bound execution (ensuring the cwd stays within AGENT_WORKTREE_ROOT)
- block path traversal attempts (e.g., ../). Even if the cwd starts in the worktree, an agent might try to read or write outside of it using relative paths in its arguments
- method for logging and profiling exactly what commands Codex and Claude are currently emitting to build a baseline allowlist for longer term best practices
- make clear todos around the need to replace/improve this
- sandbox/execution layer
- Execute commands using Node's child_process with explicitly dropped privileges (running as a non-root user via uid/gid), enforce timeouts, and stream stdout/stderr to your existing event bus for auditing.
- By default, Node child processes inherit the parent's environment variables. ensure that our env management policy is consistent and secure given this behavior
- A very modern pattern is to use your Node orchestrator to spawn a deno run child process. You can pass explicit flags like --allow-read=/target/worktree and --allow-run=git,npm. If the LLM tries to read an env file outside that directory, the Deno runtime instantly kills the process at the OS level.
- agents need to modify files in AGENT_WORKTREE_ROOT, but they must absolutely not have write access to AGENT_STATE_ROOT or AGENT_PROJECT_CONTEXT_PATH. The security middleware must strictly enforce this boundary.
- Your PipelineExecutor currently routes validation_fail into a retry-unrolled execution. You will need to define a new error class (e.g., SecurityViolationError). Should a security violation trigger a retry (telling the LLM "You can't do that, try another way"), or should it instantly hard-abort the pipeline?
- Your MCP tools currently have auto-installed builtins. The rules engine needs to apply not just to shell commands, but also to MCP tool calls. The schema for tool clearance (currently a TODO at src/agents/persona-registry.ts:79) needs to be unified with this new rules engine
- schema differences between claude and codex - no clue if we are doing anything for this
- MCP config boundary only verifies that the config is an "object" before casting it, risking late-stage crashes. Furthermore, the shared MCP type is missing the sdk (in-process) transport type supported by Claude
- Create a strict Zod schema for MCP configuration
- Define McpConfigSchema using Zod to strictly validate field shapes, ranges, and enums before handoff. Update src/mcp/types.ts to include sdk alongside stdio, http, and sse in your shared transport union
- Provider-specific MCP fields like enabled_tools and timeouts are used by Codex but silently dropped during conversion for Claude. This violates user expectations
- Fail fast or warn loudly: If the provider is set to Claude and these asymmetric fields are present in the parsed config, emit a clear warning log (e.g., [WARN] MCP field 'timeouts' is not supported by the Claude adapter and will be ignored)
- The SDK adapters are under-tested. The test suite covers converters and registries but misses the actual execution wiring, stream handling, and result parsing for Codex and Claude
- Implement integration/unit tests for the adapter boundaries
- add support CLAUDE_CODE_OAUTH_TOKEN instead of api key
- ensure your configuration schema can accept the new OAuth token. To keep it backward-compatible with standard API keys (in case you ever need to switch back), you can check for the OAuth token first, then fall back to the standard API key.
- runClaudePrompt drops the parsed config and lets the SDK auto-discover process.env.ANTHROPIC_API_KEY.
- task management flow
- init
- planning
- prioritization
- dependency graph
- subtasks
- task/subtask status updates (pending, in progress, done, failed)
- You need to explicitly pass the anthropicToken from your configuration into the underlying Anthropic client constructor. Depending on how you are instantiating the Agent SDK, you will pass it via the authToken or apiKey property
- found evidence of this drift in src/agents/provisioning.ts#L94. You will need to apply the exact same explicit wiring pattern there. Any time you instantiate the Anthropic client or the Claude Agent in a worker node, pass { apiKey: config.provider.anthropicToken }.
# Considering
- model selection per task/session/agent
- agent "notebook"
- agent run log
- agent persona support
- ping pong support - ie. product agent > dev agent, dev agent needs clarification = ping pong back to product. same with tester > dev.
- resume session aspect of this
- max ping pong length ie. tester can only pass back once otherwise mark as failed
- max ping pong length per relationship ie dev:git can ping pong 4 times, dev:product only once, etc
- git orchestration
- merging
- symlinks
- security
- whatever existing thing has
- banned commands (look up a git repo for this)
- front end
- list available models
- specific workflows
- ui
- ci/cd
- review
- testing
# Defer
# Won't Do
- rip out legacy/deprecated interfaces ie legacy status triggers, deprecated subagent method, etc
- recursive agent deprecation thing
# Completed
1. boilerplate typescript project for claude
- mcp server support
- generic mcp handlers
- specific mcp handlers for
- context7
- claude task manager
- concurrency, configurable max agent and max depth
- Extensible Resource Provisioning
- hard constraints
- soft constraints
- basic hygeine run
# epic
- agent orchestration system improvements
# module 1
- schema driven execution engine
- specific definitions handled in AgentManifest schema
- persona registry
- templated system prompts injected with runtime context
- tool clearances (stub this for now, add TODO for security implementation)
- allowlist
- banlist
- behavioral event handlers
- define how personas react to specific events ie. onTaskComplete, onValidationFail
# module 2
- actor oriented pipeline constrained by a strict directed acyclic graph
- relationship + pipeline graphs
- multi level topology
- hierarchical ie parent spawns 3 coder children
- unrolled retry pipelines ie coder1 > QA1 > Coder2 > QA2
- sequential ie product > task > coder > QA > git
- support for constraint definition for each concept (relationship, pipeline, topology)
- ie max depth, max retries
- state dependent routings
- support branching logic based on project history or repository state ie. project init requires product agent to generate prd, then task agent needs to create roadmap, once those exist future sessions skip those agents and go straight to coder agents
# module 3
- state/context manager
- stateless handoffs
- state and context are passed forwards through payloads via worktree/storage, not conversational memory
- fresh context per node execution
# module 4
- resource provisioning
- hierarchical resource suballocation
- when a parent agent spawns children, handle local resource management
- branche/sub-worktree provisioning
- suballocating deterministic port range provisioning
- extensibility to support future resource types
legacy/deprecated interfaces
# Phase 1: Safe & Focused Cleanups (Low/Medium Risk)
These can be bundled into a single PR or tackled as quick, independent tasks. They have minimal blast radius and clear mitigation paths.
Legacy status history duplication: * Action: Remove the historyEvent singular path in favor of the domain-event history. Migrate conditions from validation_fail to validation_failed.
Impact: Requires updating orchestration tests (tests/orchestration-engine.test.ts) and history semantics documentation.
Remove internal Claude token (anthropicToken):
Action: Simplify the resolver in src/config.ts to oauth/api only and drop the property.
Impact: Update config tests. Low risk, but constitutes an API shape change.
Remove MCP legacy header alias (http_headers):
Action: Drop from shared schema/type (src/mcp/types.ts) and converter merge logic (src/mcp/converters.ts).
Impact: Medium risk due to external config compatibility. Crucial: You must add a migration note for users utilizing external MCP configs.
Remove legacy edge trigger aliases (Alias-only):
Action: Remove the onTaskComplete and onValidationFail aliases only.
Impact: Safe to do now, as it shrinks the legacy surface area without breaking the core edge.on functionality currently heavily relied upon in tests.
# Phase 2: Staged Removals (Requires Care)
This item is deeply integrated and needs a multi-step replacement strategy rather than a direct deletion.
Deprecate runRecursiveAgent API:
Action: First, update the Pipeline (src/agents/pipeline.ts) to use the new private replacement call path for recursive execution. Only after the pipeline is successfully rerouted and the manager tests are updated should you remove the public deprecated wrapper.
README Impact: You will need to remove or update the note in the "Notes" section of your README that currently advertises AgentManager.runRecursiveAgent(...) for low-level testing.
# Phase 3 legacy/deprecated interfaces BIG AND SCARY
Switching to un-ts/sh-syntax is exactly the right move. It is a WebAssembly (WASM) wrapper around Go's highly respected mvdan/sh parser. It provides rigorous POSIX/Bash compliance and, crucially, ships with strict, native TypeScript definitions for its entire AST.
Here is the updated implementation guide tailored specifically to integrating un-ts/sh-syntax into your security middleware.
Phase 1: Dependency Migration
Remove the Legacy Code:
npm uninstall bash-parser
rm src/types/bash-parser.d.ts
Install the Replacement:
npm install sh-syntax
Phase 2: Rewrite the AST Adapter (src/security/shell-parser.ts)
The most significant architectural shift here is that sh-syntax is WASM-backed, making the parsing operation asynchronous. Your adapter and the calling security middleware must be updated to handle Promises.
You will also map your traversal logic to the mvdan/sh AST structures (e.g., File, Stmt, CallExpr, Word).
TypeScript
import { parse } from 'sh-syntax';
// Note: Depending on your runtime environment, you may need to configure the WASM loader
// via `import { getProcessor } from 'sh-syntax'` if standard Node resolution isn't sufficient.
export interface CommandTarget {
binary: string;
args: string[];
}
export async function extractExecutionTargets(shellInput: string): Promise<CommandTarget[]> {
const targets: CommandTarget[] = [];
// sh-syntax parsing is async due to WASM initialization
const ast = await parse(shellInput);
// Walk the AST. sh-syntax types closely mirror mvdan/sh Go types.
// The root is typically a 'File' containing a list of 'Stmt' (Statements).
if (!ast || !ast.StmtList || !ast.StmtList.Stmts) return targets;
for (const stmt of ast.StmtList.Stmts) {
const cmd = stmt.Cmd;
// Check if the command is a standard function/binary call
if (cmd && cmd.type === 'CallExpr') {
const args = cmd.Args;
if (args && args.length > 0) {
// The first argument in a CallExpr is the binary name
// You must strictly check that the binary is a literal (Word) and not computed
const binaryWord = args[0];
const binaryName = extractLiteralWord(binaryWord);
if (!binaryName) {
throw new SecurityViolationError("Dynamic or computed binary names are blocked.");
}
targets.push({
binary: binaryName,
args: args.slice(1).map(extractLiteralWord).filter(Boolean) as string[]
});
}
}
// Important: Explicitly reject subshells or commands your engine doesn't support
if (cmd && cmd.type === 'Subshell') {
throw new SecurityViolationError("Subshell execution is not permitted by security policy.");
}
// Analyze redirects to ensure they don't overwrite protected files (like state roots)
if (stmt.Redirs) {
enforceRedirectPolicy(stmt.Redirs);
}
}
return targets;
}
// Helper to safely extract string literals from Word nodes
function extractLiteralWord(wordNode: any): string | null {
// In sh-syntax, a Word contains Parts (Lit, SglQuoted, DblQuoted, ParamExp, etc.)
// You must enforce that the parts only consist of safe literals, rejecting ParamExp ($VAR).
// ...
}
Phase 3: Synchronize Orchestration & Middleware
Because extractExecutionTargets is now async:
Update SecureCommandExecutor: The constructor or initialization hook where you validate the command against your allowlists must await the parsing step.
Actor Execution Boundary: Ensure that wherever the LLM outputs a shell command during the DAG execution, the pipeline waits for the AST security validation before proceeding.
Phase 4: Strict Schema Alignment (src/security/schemas.ts)
Your Zod schemas do not need to validate the sh-syntax AST directly (since it is already strictly typed by the library). Instead, use Zod to validate the output array (CommandTarget[]) to ensure nothing slipped past the parser.
Update the execution schema: Ensure it enforces .strict() so that if extractExecutionTargets accidentally returns extraneous fields, the engine panics and fails closed.
Unified Allowlist validation: Ensure the extracted binary string is strictly validated against your AGENT_SECURITY_ALLOWED_BINARIES Zod array.
Phase 5: Revalidate Security Parity (tests/security-middleware.test.ts)
This is the final gate. The new AST structure handles complex bash semantics differently than the old untyped parser.
WASM Test Environment: Ensure your test runner (Jest, Vitest) is configured to load WebAssembly properly, or the parse function will throw an initialization error in CI.
Regression Threat Matrix:
echo $(unauthorized_bin) -> Must be caught and throw SecurityViolationError (Subshell).
allowed_bin && unauthorized_bin -> The StmtList must iterate over both commands and block the execution due to the second binary.
allowed_bin > /path/to/protected/file -> The Redirs property on the Stmt must trigger your boundary violation logic.
- review/update of readme, docs, and conf files where needed
- mvp for analytics + user notification logging
# giga model specific behavior and strict task agent control stuff
i am too dumb to understand it, but gemini 3.1 makes it sound like a really good idea
Architecture Brief: Deterministic Agent Execution & Policy Enforcement
Context & Goal
We are refactoring the execution layer to ensure low-level control over task agents (e.g., task_sync, task_plan_llm). The goal is to move away from open-ended, non-deterministic agent behavior and enforce a strict 4-layer control model where the LLM acts only as a bounded step within a hard-coded state machine.
The Problem Statement
We currently have a critical gap in our enforcement boundary that allows policy bypasses, compounded by provider-specific SDK quirks:
The Context Drop (The MCP Gap): The mcpRegistry (which defines our tool policies) is resolved globally at the orchestration layer, but it is not passed down into pipeline.ts or the ActorExecutor. As a result, the low-level execution nodes operate without awareness of the active tool clearance policies.
The Claude SDK Leakage: The Anthropic Claude SDK currently ignores the shared enabled_tools configuration in the MCP payload. If we rely solely on the shared MCP config, Claude can hallucinate and execute unauthorized tool calls.
The Anti-Pattern Risk: The initial proposal to fix this was to pass the entire mcpRegistry down into the ActorExecutor so it could self-regulate. This is a severe anti-pattern. It tightly couples our low-level execution sandbox with our high-level orchestration logic, forcing the "dumb" executor to parse topologies, phases, and complex registry configurations.
The Solution: The ResolvedExecutionContext Pattern
To close the enforcement gap without violating the Inversion of Control principle, we will implement a strict separation of concerns. The Orchestrator will handle the logic; the Executor will handle the enforcement.
Instead of passing down the full registry, the orchestration layer will pre-compute a flat, immutable policy payload for that specific node attempt and inject it into the executor.
Implementation Directives
Introduce ResolvedExecutionContext:
Create an interface that represents the fully resolved, un-negotiable constraints for a single execution step.
TypeScript
export interface ResolvedExecutionContext {
phase: string;
modelConstraint: string; // e.g., 'claude-3-haiku'
allowedTools: string[]; // Flat array of resolved tool names
security: {
dropUid: boolean;
worktreePath: string;
// ... other hard constraints
}
}
Update Orchestration (pipeline.ts):
Before invoking an actor, the pipeline must read the AgentManifest for the current node, cross-reference its toolClearance with the mcpRegistry, and generate the ResolvedExecutionContext.
Lock Down the Executor (executor.ts):
The ActorExecutor must accept this context and enforce it blindly:
Model Enforcement: Force the SDK initialization to strictly use context.modelConstraint.
Tool Enforcement (Claude Fix): Explicitly filter the tools passed into the provider SDK using context.allowedTools, physically preventing the Claude SDK from seeing tools outside its clearance.
Security Middleware: Pass context.allowedTools into the SecurityRulesEngine so any runtime attempt to bypass the SDK constraints results in an immediate AGENT_SECURITY_VIOLATION_MODE=hard_abort.
Expected Outcome
The execution nodes remain entirely decoupled from the orchestration state. The LLM cannot escalate its model tier or access unauthorized tools, and the provider SDK quirks are mitigated at the execution boundary.
# epic
# front end ui requirements
1. Graph Visualizer
Your initial thoughts on coloring by stage/agent and showing metadata (subtasks, tool calls, security violations) are spot-on. Because your backend relies heavily on DAG execution and a retry matrix, the visualizer will be the most critical piece of the UI.
What else is worth visualizing?
Based on your README, here are specific concepts you should expose on the graph:
Topology & Control Flow: Visually distinguish between sequential, parallel, hierarchical, and retry-unrolled branches. For example, a retry-unrolled node should visually indicate that it spawned a new child manager session to remediate a validation_fail.
Domain Event Edges: Since your pipeline edges route via typed events (requirements_defined, validation_failed), labeling the edges of the graph with the specific domain event that triggered the transition will make debugging orchestration loops much easier.
Economics & Performance (from Runtime Events): Your NDJSON events log tokenInput, tokenOutput, durationMs, and costUsd. Surfacing the "cost" or "time" of a specific DAG node directly on the graph helps identify inefficient prompts or agents.
the "Sandbox Payload": When a user clicks or hovers over a specific node (e.g., task_plan_llm), the UI must display the ResolvedExecutionContext payload that was injected into it
Critical Path & Abort Status: If a session fails due to two consecutive hard failures, visually highlighting the exact "critical path" that led to the AbortSignal cascading through the system will save hours of log-diving.
2. Notification / Webhook Interface
Your backend already has an elegant fan-out system (NDJSON analytics log + Discord webhook). The UI should act as a control panel and an in-app inbox for this.
Configuration: A form to manage AGENT_RUNTIME_DISCORD_WEBHOOK_URL, AGENT_RUNTIME_DISCORD_MIN_SEVERITY, and the ALWAYS_NOTIFY_TYPES CSV.
Live Event Feed: A real-time drawer or panel that tails the .ai_ops/events/runtime-events.ndjson file. You can parse the severity field to color-code the feed (e.g., flashing red for critical security mirror events like security.shell.command_blocked).
3. Job Trigger Interface
This is your execution entrypoint (SchemaDrivenExecutionEngine.runSession).
Inputs: A clean interface to provide the initial prompt/task, select the Manifest or Topology they want to run, and override global flags.
The "Kill Switch": Since every actor execution respects an AbortSignal, your UI needs a prominent, highly responsive "Cancel Run" button that immediately aborts child recursive work.
Run History: A table view summarizing aggregate session status from AGENT_STATE_ROOT, allowing users to click into past runs to view their graph state.
4. Definition Interface (Manifest, Config, Security)
You noted that anything secure stays on the backend. The frontend here should strictly be a client that reads/writes validated JSON or environment schemas.
Manifest Builder: A UI to visually build or edit the AgentManifest (Schema "1"), defining personas, tool-clearance policies, modelConstraint (or allowedModel), and setting maxDepth/maxRetries.
Security Policy Management: An interface mapped to src/security/schemas.ts. This allows admins to define AGENT_SECURITY_ALLOWED_BINARIES, toggle AGENT_SECURITY_VIOLATION_MODE (hard_abort vs validation_fail), and manage MCP tool allowlists/banlists.
Environment & Resource Limits: Simple forms to configure agent manager limits (AGENT_MAX_CONCURRENT) and port block sizing without manually editing the .env file.

View File

@@ -2,9 +2,14 @@ import { randomUUID } from "node:crypto";
import type { JsonObject } from "./types.js";
export type PlanningDomainEventType = "requirements_defined" | "tasks_planned";
export type ExecutionDomainEventType = "code_committed" | "task_blocked";
export type ExecutionDomainEventType = "code_committed" | "task_blocked" | "task_ready_for_review";
export type ValidationDomainEventType = "validation_passed" | "validation_failed";
export type IntegrationDomainEventType = "branch_merged";
export type IntegrationDomainEventType =
| "branch_merged"
| "merge_conflict_detected"
| "merge_conflict_resolved"
| "merge_conflict_unresolved"
| "merge_retry_started";
export type DomainEventType =
| PlanningDomainEventType
@@ -46,9 +51,14 @@ const DOMAIN_EVENT_TYPES = new Set<DomainEventType>([
"tasks_planned",
"code_committed",
"task_blocked",
"task_ready_for_review",
"validation_passed",
"validation_failed",
"branch_merged",
"merge_conflict_detected",
"merge_conflict_resolved",
"merge_conflict_unresolved",
"merge_retry_started",
]);
export function isDomainEventType(value: string): value is DomainEventType {

View File

@@ -50,10 +50,14 @@ function toNodeAttemptSeverity(status: ActorResultStatus): RuntimeEventSeverity
}
function toDomainEventSeverity(type: DomainEventType): RuntimeEventSeverity {
if (type === "task_blocked") {
if (type === "task_blocked" || type === "merge_conflict_unresolved") {
return "critical";
}
if (type === "validation_failed") {
if (
type === "validation_failed" ||
type === "merge_conflict_detected" ||
type === "merge_retry_started"
) {
return "warning";
}
return "info";

View File

@@ -2,6 +2,7 @@ import { resolve } from "node:path";
import { getConfig, loadConfig, type AppConfig } from "../config.js";
import { createDefaultMcpRegistry, loadMcpConfigFromEnv, McpRegistry } from "../mcp.js";
import { parseAgentManifest, type AgentManifest } from "./manifest.js";
import type { DomainEventEmission } from "./domain-events.js";
import { AgentManager } from "./manager.js";
import {
PersonaRegistry,
@@ -13,10 +14,16 @@ import {
type ActorExecutionSecurityContext,
type ActorExecutor,
type PipelineRunSummary,
type TaskExecutionLifecycle,
} from "./pipeline.js";
import { FileSystemProjectContextStore } from "./project-context.js";
import {
FileSystemProjectContextStore,
type ProjectTask,
type ProjectTaskStatus,
} from "./project-context.js";
import { FileSystemStateContextManager, type StoredSessionState } from "./state-context.js";
import type { JsonObject } from "./types.js";
import { SessionWorktreeManager, type SessionMetadata } from "./session-lifecycle.js";
import {
SecureCommandExecutor,
type SecurityAuditEvent,
@@ -38,6 +45,7 @@ export type OrchestrationSettings = {
maxDepth: number;
maxRetries: number;
maxChildren: number;
mergeConflictMaxAttempts: number;
securityViolationHandling: "hard_abort" | "validation_fail";
runtimeContext: Record<string, string | number | boolean>;
};
@@ -56,6 +64,7 @@ export function loadOrchestrationSettingsFromEnv(
maxDepth: config.orchestration.maxDepth,
maxRetries: config.orchestration.maxRetries,
maxChildren: config.orchestration.maxChildren,
mergeConflictMaxAttempts: config.orchestration.mergeConflictMaxAttempts,
securityViolationHandling: config.security.violationHandling,
};
}
@@ -181,6 +190,9 @@ function createActorSecurityContext(input: {
type: `security.${event.type}`,
severity: mapSecurityAuditSeverity(event),
message: toSecurityAuditMessage(event),
...(event.sessionId ? { sessionId: event.sessionId } : {}),
...(event.nodeId ? { nodeId: event.nodeId } : {}),
...(typeof event.attempt === "number" ? { attempt: event.attempt } : {}),
metadata: toSecurityAuditMetadata(event),
});
};
@@ -221,6 +233,57 @@ function createActorSecurityContext(input: {
};
}
function resolveSessionProjectContextPath(stateRoot: string, sessionId: string): string {
return resolve(stateRoot, sessionId, "project-context.json");
}
function readTaskIdFromPayload(payload: JsonObject, fallback: string): string {
const candidates = [payload.taskId, payload.task_id, payload.task];
for (const candidate of candidates) {
if (typeof candidate === "string" && candidate.trim().length > 0) {
return candidate.trim();
}
}
return fallback;
}
function toTaskStatusForFailure(
resultStatus: "validation_fail" | "failure",
statusAtStart: string,
): ProjectTaskStatus {
if (resultStatus === "failure") {
return "failed";
}
if (statusAtStart === "conflict" || statusAtStart === "resolving_conflict") {
return "conflict";
}
return "in_progress";
}
function shouldMergeFromStatus(statusAtStart: string): boolean {
return statusAtStart === "review" || statusAtStart === "resolving_conflict";
}
function toTaskIdLabel(task: ProjectTask): string {
return task.taskId || task.id || "task";
}
function toJsonObject(value: unknown): JsonObject | undefined {
if (!value || typeof value !== "object" || Array.isArray(value)) {
return undefined;
}
return value as JsonObject;
}
function readMergeConflictAttempts(metadata: JsonObject | undefined): number {
const record = toJsonObject(metadata?.mergeConflict);
const attempts = record?.attempts;
if (typeof attempts === "number" && Number.isInteger(attempts) && attempts >= 0) {
return attempts;
}
return 0;
}
export class SchemaDrivenExecutionEngine {
private readonly manifest: AgentManifest;
private readonly personaRegistry = new PersonaRegistry();
@@ -234,6 +297,7 @@ export class SchemaDrivenExecutionEngine {
private readonly mcpRegistry: McpRegistry;
private readonly runtimeEventPublisher: RuntimeEventPublisher;
private readonly securityContext: ActorExecutionSecurityContext;
private readonly sessionWorktreeManager: SessionWorktreeManager;
constructor(input: {
manifest: AgentManifest | unknown;
@@ -260,6 +324,8 @@ export class SchemaDrivenExecutionEngine {
maxDepth: input.settings?.maxDepth ?? config.orchestration.maxDepth,
maxRetries: input.settings?.maxRetries ?? config.orchestration.maxRetries,
maxChildren: input.settings?.maxChildren ?? config.orchestration.maxChildren,
mergeConflictMaxAttempts:
input.settings?.mergeConflictMaxAttempts ?? config.orchestration.mergeConflictMaxAttempts,
securityViolationHandling:
input.settings?.securityViolationHandling ?? config.security.violationHandling,
runtimeContext: {
@@ -273,6 +339,10 @@ export class SchemaDrivenExecutionEngine {
this.projectContextStore = new FileSystemProjectContextStore({
filePath: this.settings.projectContextPath,
});
this.sessionWorktreeManager = new SessionWorktreeManager({
worktreeRoot: resolve(this.settings.workspaceRoot, this.config.provisioning.gitWorktree.rootDirectory),
baseRef: this.config.provisioning.gitWorktree.baseRef,
});
this.actorExecutors = toExecutorMap(input.actorExecutors);
this.manager =
@@ -352,9 +422,22 @@ export class SchemaDrivenExecutionEngine {
initialPayload: JsonObject;
initialState?: Partial<StoredSessionState>;
signal?: AbortSignal;
sessionMetadata?: SessionMetadata;
}): Promise<PipelineRunSummary> {
const managerSessionId = `${input.sessionId}__pipeline`;
const managerSession = this.manager.createSession(managerSessionId);
const workspaceRoot = input.sessionMetadata?.baseWorkspacePath ?? this.settings.workspaceRoot;
const projectContextStore = input.sessionMetadata
? new FileSystemProjectContextStore({
filePath: resolveSessionProjectContextPath(this.settings.stateRoot, input.sessionId),
})
: this.projectContextStore;
const taskLifecycle = input.sessionMetadata
? this.createTaskExecutionLifecycle({
session: input.sessionMetadata,
projectContextStore,
})
: undefined;
const executor = new PipelineExecutor(
this.manifest,
@@ -362,25 +445,26 @@ export class SchemaDrivenExecutionEngine {
this.stateManager,
this.actorExecutors,
{
workspaceRoot: this.settings.workspaceRoot,
workspaceRoot,
runtimeContext: this.settings.runtimeContext,
defaultModelConstraint: this.config.provider.claudeModel,
resolvedExecutionSecurityConstraints: {
dropUid: this.config.security.dropUid !== undefined,
dropGid: this.config.security.dropGid !== undefined,
worktreePath: this.settings.workspaceRoot,
worktreePath: workspaceRoot,
violationMode: this.settings.securityViolationHandling,
},
maxDepth: Math.min(this.settings.maxDepth, this.manifest.topologyConstraints.maxDepth),
maxRetries: Math.min(this.settings.maxRetries, this.manifest.topologyConstraints.maxRetries),
manager: this.manager,
managerSessionId,
projectContextStore: this.projectContextStore,
resolveMcpConfig: ({ providerHint, prompt, toolClearance }) =>
projectContextStore,
resolveMcpConfig: ({ providerHint, prompt, toolClearance, workingDirectory }) =>
loadMcpConfigFromEnv(
{
providerHint,
prompt,
...(workingDirectory ? { workingDirectory } : {}),
},
{
config: this.config,
@@ -391,6 +475,7 @@ export class SchemaDrivenExecutionEngine {
securityViolationHandling: this.settings.securityViolationHandling,
securityContext: this.securityContext,
runtimeEventPublisher: this.runtimeEventPublisher,
...(taskLifecycle ? { taskLifecycle } : {}),
},
);
try {
@@ -405,6 +490,334 @@ export class SchemaDrivenExecutionEngine {
}
}
private createTaskExecutionLifecycle(input: {
session: SessionMetadata;
projectContextStore: FileSystemProjectContextStore;
}): TaskExecutionLifecycle {
return {
prepareTaskExecution: async ({ node, context }) => {
const taskId = readTaskIdFromPayload(context.handoff.payload, node.id);
const projectContext = await input.projectContextStore.readState();
const existing = projectContext.taskQueue.find(
(task) => toTaskIdLabel(task) === taskId,
);
const ensured = await this.sessionWorktreeManager.ensureTaskWorktree({
sessionId: input.session.sessionId,
taskId,
baseWorkspacePath: input.session.baseWorkspacePath,
...(existing?.worktreePath ? { existingWorktreePath: existing.worktreePath } : {}),
});
const statusAtStart: ProjectTaskStatus =
existing?.status === "review" ||
existing?.status === "conflict" ||
existing?.status === "resolving_conflict"
? existing.status
: "in_progress";
await input.projectContextStore.patchState({
upsertTasks: [
{
taskId,
id: taskId,
status: statusAtStart,
worktreePath: ensured.taskWorktreePath,
...(existing?.title ? { title: existing.title } : { title: taskId }),
...(existing?.metadata ? { metadata: existing.metadata } : {}),
},
],
});
return {
taskId,
worktreePath: ensured.taskWorktreePath,
statusAtStart,
...(existing?.metadata ? { metadata: existing.metadata } : {}),
};
},
finalizeTaskExecution: async ({ task, result, domainEvents }) => {
const emittedTypes = new Set(domainEvents.map((event) => event.type));
const additionalEvents: DomainEventEmission[] = [];
const emitEvent = (
type: DomainEventEmission["type"],
payload?: DomainEventEmission["payload"],
): void => {
if (emittedTypes.has(type)) {
return;
}
emittedTypes.add(type);
additionalEvents.push(payload ? { type, payload } : { type });
};
if (result.status === "failure" || result.status === "validation_fail") {
await input.projectContextStore.patchState({
upsertTasks: [
{
taskId: task.taskId,
id: task.taskId,
status: toTaskStatusForFailure(result.status, task.statusAtStart),
worktreePath: task.worktreePath,
title: task.taskId,
...(task.metadata ? { metadata: task.metadata } : {}),
},
],
});
return;
}
if (task.statusAtStart === "conflict") {
const attempts = readMergeConflictAttempts(task.metadata);
const metadata: JsonObject = {
...(task.metadata ?? {}),
mergeConflict: {
attempts,
maxAttempts: this.settings.mergeConflictMaxAttempts,
status: "resolved",
resolvedAt: new Date().toISOString(),
},
};
await input.projectContextStore.patchState({
upsertTasks: [
{
taskId: task.taskId,
id: task.taskId,
status: "resolving_conflict",
worktreePath: task.worktreePath,
title: task.taskId,
metadata,
},
],
});
emitEvent("merge_conflict_resolved", {
summary: `Merge conflicts resolved for task "${task.taskId}".`,
details: {
taskId: task.taskId,
worktreePath: task.worktreePath,
attempts,
},
});
return {
additionalEvents,
handoffPayloadPatch: {
taskId: task.taskId,
worktreePath: task.worktreePath,
mergeConflictStatus: "resolved",
mergeConflictAttempts: attempts,
} as JsonObject,
};
}
if (shouldMergeFromStatus(task.statusAtStart)) {
const attemptsBeforeMerge = readMergeConflictAttempts(task.metadata);
if (task.statusAtStart === "resolving_conflict") {
emitEvent("merge_retry_started", {
summary: `Retrying merge for task "${task.taskId}".`,
details: {
taskId: task.taskId,
worktreePath: task.worktreePath,
nextAttempt: attemptsBeforeMerge + 1,
maxAttempts: this.settings.mergeConflictMaxAttempts,
},
});
}
const mergeOutcome = await this.sessionWorktreeManager.mergeTaskIntoBase({
taskId: task.taskId,
baseWorkspacePath: input.session.baseWorkspacePath,
taskWorktreePath: task.worktreePath,
});
if (mergeOutcome.kind === "success") {
await input.projectContextStore.patchState({
upsertTasks: [
{
taskId: task.taskId,
id: task.taskId,
status: "merged",
title: task.taskId,
metadata: {
...(task.metadata ?? {}),
mergeConflict: {
attempts: attemptsBeforeMerge,
maxAttempts: this.settings.mergeConflictMaxAttempts,
status: "merged",
mergedAt: new Date().toISOString(),
},
},
},
],
});
emitEvent("branch_merged", {
summary: `Task "${task.taskId}" merged into session base branch.`,
details: {
taskId: task.taskId,
worktreePath: task.worktreePath,
},
});
return {
additionalEvents,
handoffPayloadPatch: {
taskId: task.taskId,
mergeStatus: "merged",
} as JsonObject,
};
}
if (mergeOutcome.kind === "conflict") {
const attempts = attemptsBeforeMerge + 1;
const exhausted = attempts >= this.settings.mergeConflictMaxAttempts;
const metadata: JsonObject = {
...(task.metadata ?? {}),
mergeConflict: {
attempts,
maxAttempts: this.settings.mergeConflictMaxAttempts,
status: exhausted ? "unresolved" : "conflict",
conflictFiles: mergeOutcome.conflictFiles,
worktreePath: mergeOutcome.worktreePath,
detectedAt: new Date().toISOString(),
...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}),
},
};
await input.projectContextStore.patchState({
upsertTasks: [
{
taskId: task.taskId,
id: task.taskId,
status: "conflict",
worktreePath: task.worktreePath,
title: task.taskId,
metadata,
},
],
});
emitEvent("merge_conflict_detected", {
summary: `Merge conflict detected for task "${task.taskId}".`,
details: {
taskId: task.taskId,
worktreePath: mergeOutcome.worktreePath,
conflictFiles: mergeOutcome.conflictFiles,
attempts,
maxAttempts: this.settings.mergeConflictMaxAttempts,
...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}),
},
});
if (exhausted) {
emitEvent("merge_conflict_unresolved", {
summary:
`Merge conflict attempts exhausted for task "${task.taskId}" ` +
`(${String(attempts)}/${String(this.settings.mergeConflictMaxAttempts)}).`,
details: {
taskId: task.taskId,
worktreePath: mergeOutcome.worktreePath,
conflictFiles: mergeOutcome.conflictFiles,
attempts,
maxAttempts: this.settings.mergeConflictMaxAttempts,
},
});
}
return {
additionalEvents,
handoffPayloadPatch: {
taskId: task.taskId,
worktreePath: task.worktreePath,
mergeConflictStatus: exhausted ? "unresolved" : "conflict",
mergeConflictAttempts: attempts,
mergeConflictMaxAttempts: this.settings.mergeConflictMaxAttempts,
mergeConflictFiles: mergeOutcome.conflictFiles,
...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}),
} as JsonObject,
};
}
await input.projectContextStore.patchState({
upsertTasks: [
{
taskId: task.taskId,
id: task.taskId,
status: "failed",
worktreePath: task.worktreePath,
title: task.taskId,
metadata: {
...(task.metadata ?? {}),
mergeConflict: {
attempts: attemptsBeforeMerge,
maxAttempts: this.settings.mergeConflictMaxAttempts,
status: "fatal_error",
error: mergeOutcome.error,
...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}),
},
},
},
],
});
emitEvent("merge_conflict_unresolved", {
summary: `Fatal merge error for task "${task.taskId}".`,
details: {
taskId: task.taskId,
worktreePath: mergeOutcome.worktreePath,
error: mergeOutcome.error,
...(mergeOutcome.mergeBase ? { mergeBase: mergeOutcome.mergeBase } : {}),
},
});
emitEvent("task_blocked", {
summary: `Task "${task.taskId}" blocked due to fatal merge error.`,
details: {
taskId: task.taskId,
error: mergeOutcome.error,
},
});
return {
additionalEvents,
handoffPayloadPatch: {
taskId: task.taskId,
worktreePath: task.worktreePath,
mergeStatus: "fatal_error",
mergeError: mergeOutcome.error,
} as JsonObject,
};
}
const nextMetadata = task.metadata
? {
...task.metadata,
}
: undefined;
await input.projectContextStore.patchState({
upsertTasks: [
{
taskId: task.taskId,
id: task.taskId,
status: "review",
worktreePath: task.worktreePath,
title: task.taskId,
...(nextMetadata ? { metadata: nextMetadata } : {}),
},
],
});
if (additionalEvents.length > 0) {
return {
additionalEvents,
};
}
return;
},
};
}
private assertRelationshipConstraints(): void {
for (const [parent, edges] of this.childrenByParent.entries()) {
if (edges.length > this.settings.maxChildren) {

View File

@@ -107,6 +107,8 @@ export type ResolvedExecutionContext = {
export type ActorExecutionInput = {
sessionId: string;
attempt: number;
depth: number;
node: PipelineNode;
prompt: string;
context: NodeExecutionContext;
@@ -153,6 +155,7 @@ export type PipelineExecutorOptions = {
securityViolationHandling?: SecurityViolationHandling;
securityContext?: ActorExecutionSecurityContext;
runtimeEventPublisher?: RuntimeEventPublisher;
taskLifecycle?: TaskExecutionLifecycle;
};
export type ActorExecutionSecurityContext = {
@@ -166,6 +169,34 @@ export type ActorExecutionSecurityContext = {
}) => SecureCommandExecutor;
};
export type TaskExecutionResolution = {
taskId: string;
worktreePath: string;
statusAtStart: string;
metadata?: JsonObject;
};
export type TaskExecutionLifecycle = {
prepareTaskExecution: (input: {
sessionId: string;
node: PipelineNode;
context: NodeExecutionContext;
}) => Promise<TaskExecutionResolution>;
finalizeTaskExecution: (input: {
sessionId: string;
node: PipelineNode;
task: TaskExecutionResolution;
result: ActorExecutionResult;
domainEvents: DomainEvent[];
}) => Promise<
| void
| {
additionalEvents?: DomainEventEmission[];
handoffPayloadPatch?: JsonObject;
}
>;
};
type QueueItem = {
nodeId: string;
depth: number;
@@ -458,6 +489,38 @@ function toToolNameCandidates(toolName: string): string[] {
return dedupeStrings(candidates);
}
function buildCaseInsensitiveToolLookup(tools: readonly string[]): Map<string, string> {
const lookup = new Map<string, string>();
for (const tool of tools) {
const normalized = tool.trim().toLowerCase();
if (!normalized || lookup.has(normalized)) {
continue;
}
lookup.set(normalized, tool);
}
return lookup;
}
function resolveAllowedToolMatch(input: {
candidates: readonly string[];
allowset: ReadonlySet<string>;
caseInsensitiveLookup: ReadonlyMap<string, string>;
}): string | undefined {
const direct = input.candidates.find((candidate) => input.allowset.has(candidate));
if (direct) {
return direct;
}
for (const candidate of input.candidates) {
const match = input.caseInsensitiveLookup.get(candidate.toLowerCase());
if (match) {
return match;
}
}
return undefined;
}
function defaultEventPayloadForStatus(status: ActorResultStatus): DomainEventPayload {
if (status === "success") {
return {
@@ -580,9 +643,11 @@ export class PipelineExecutor {
globalFlags: { ...projectContext.globalFlags },
artifactPointers: { ...projectContext.artifactPointers },
taskQueue: projectContext.taskQueue.map((task) => ({
id: task.id,
title: task.title,
taskId: task.taskId,
id: task.id ?? task.taskId,
...(task.title ? { title: task.title } : {}),
status: task.status,
...(task.worktreePath ? { worktreePath: task.worktreePath } : {}),
...(task.assignee ? { assignee: task.assignee } : {}),
...(task.metadata ? { metadata: task.metadata } : {}),
})),
@@ -854,6 +919,13 @@ export class PipelineExecutor {
})();
const context = await this.stateManager.buildFreshNodeContext(sessionId, node.id);
const taskResolution = this.options.taskLifecycle
? await this.options.taskLifecycle.prepareTaskExecution({
sessionId,
node,
context,
})
: undefined;
const prompt = this.personaRegistry.renderSystemPrompt({
personaId: node.personaId,
runtimeContext: {
@@ -869,10 +941,13 @@ export class PipelineExecutor {
node,
toolClearance,
prompt,
worktreePathOverride: taskResolution?.worktreePath,
});
const result = await this.invokeActorExecutor({
sessionId,
attempt,
depth: recursiveDepth,
node,
prompt,
context,
@@ -889,12 +964,49 @@ export class PipelineExecutor {
customEvents: result.events,
});
const topologyKind: NodeTopologyKind = node.topology?.kind ?? "sequential";
const payloadForNext = result.payload ?? context.handoff.payload;
let payloadForNext: JsonObject = {
...context.handoff.payload,
...(result.payload ?? {}),
...(taskResolution
? {
taskId: taskResolution.taskId,
worktreePath: taskResolution.worktreePath,
}
: {}),
};
const shouldRetry =
result.status === "validation_fail" &&
this.shouldRetryValidation(node) &&
attempt <= maxRetriesForNode;
if (taskResolution && this.options.taskLifecycle) {
const finalization = await this.options.taskLifecycle.finalizeTaskExecution({
sessionId,
node,
task: taskResolution,
result,
domainEvents,
});
for (const eventEmission of finalization?.additionalEvents ?? []) {
domainEvents.push(
createDomainEvent({
type: eventEmission.type,
source: "pipeline",
sessionId,
nodeId: node.id,
attempt,
payload: eventEmission.payload,
}),
);
}
if (finalization?.handoffPayloadPatch) {
payloadForNext = {
...payloadForNext,
...finalization.handoffPayloadPatch,
};
}
}
await this.lifecycleObserver.onNodeAttempt({
sessionId,
node,
@@ -989,6 +1101,8 @@ export class PipelineExecutor {
private async invokeActorExecutor(input: {
sessionId: string;
attempt: number;
depth: number;
node: PipelineNode;
prompt: string;
context: NodeExecutionContext;
@@ -1001,12 +1115,20 @@ export class PipelineExecutor {
return await input.executor({
sessionId: input.sessionId,
attempt: input.attempt,
depth: input.depth,
node: input.node,
prompt: input.prompt,
context: input.context,
signal: input.signal,
executionContext: input.executionContext,
mcp: this.buildActorMcpContext(input.executionContext, input.prompt),
mcp: this.buildActorMcpContext({
sessionId: input.sessionId,
nodeId: input.node.id,
attempt: input.attempt,
executionContext: input.executionContext,
prompt: input.prompt,
}),
security: this.securityContext,
});
} catch (error) {
@@ -1047,9 +1169,15 @@ export class PipelineExecutor {
node: PipelineNode;
toolClearance: ToolClearancePolicy;
prompt: string;
worktreePathOverride?: string;
}): ResolvedExecutionContext {
const normalizedToolClearance = parseToolClearancePolicy(input.toolClearance);
const toolUniverse = this.resolveAvailableToolsForAttempt(normalizedToolClearance, input.prompt);
const worktreePath = input.worktreePathOverride ?? this.options.resolvedExecutionSecurityConstraints.worktreePath;
const toolUniverse = this.resolveAvailableToolsForAttempt({
toolClearance: normalizedToolClearance,
prompt: input.prompt,
worktreePath,
});
const allowedTools = this.resolveAllowedToolsForAttempt({
toolClearance: normalizedToolClearance,
toolUniverse,
@@ -1065,6 +1193,7 @@ export class PipelineExecutor {
allowedTools,
security: {
...this.options.resolvedExecutionSecurityConstraints,
worktreePath,
},
};
}
@@ -1087,15 +1216,20 @@ export class PipelineExecutor {
return [];
}
private resolveAvailableToolsForAttempt(toolClearance: ToolClearancePolicy, prompt: string): string[] {
private resolveAvailableToolsForAttempt(input: {
toolClearance: ToolClearancePolicy;
prompt: string;
worktreePath: string;
}): string[] {
if (!this.options.resolveMcpConfig) {
return [];
}
const resolved = this.options.resolveMcpConfig({
providerHint: "codex",
prompt,
toolClearance,
prompt: input.prompt,
workingDirectory: input.worktreePath,
toolClearance: input.toolClearance,
});
const rawServers = resolved.codexConfig?.mcp_servers;
@@ -1115,10 +1249,14 @@ export class PipelineExecutor {
return dedupeStrings(tools);
}
private buildActorMcpContext(
executionContext: ResolvedExecutionContext,
prompt: string,
): ActorExecutionMcpContext {
private buildActorMcpContext(input: {
sessionId: string;
nodeId: string;
attempt: number;
executionContext: ResolvedExecutionContext;
prompt: string;
}): ActorExecutionMcpContext {
const { executionContext, prompt } = input;
const toolPolicy = toAllowedToolPolicy(executionContext.allowedTools);
const filterToolsForProvider = (tools: string[]): string[] => {
const deduped = dedupeStrings(tools);
@@ -1129,6 +1267,7 @@ export class PipelineExecutor {
? this.options.resolveMcpConfig({
providerHint: "both",
prompt,
workingDirectory: executionContext.security.worktreePath,
toolClearance: toolPolicy,
})
: {};
@@ -1137,7 +1276,12 @@ export class PipelineExecutor {
executionContext.allowedTools,
);
const resolveConfig = (context: McpLoadContext = {}): LoadedMcpConfig => {
if (context.providerHint === "codex") {
const withWorkingDirectory: McpLoadContext = {
...context,
...(context.workingDirectory ? {} : { workingDirectory: executionContext.security.worktreePath }),
};
if (withWorkingDirectory.providerHint === "codex") {
return {
...(resolvedConfig.codexConfig ? { codexConfig: cloneMcpConfig(resolvedConfig).codexConfig } : {}),
...(resolvedConfig.sourcePath ? { sourcePath: resolvedConfig.sourcePath } : {}),
@@ -1147,7 +1291,7 @@ export class PipelineExecutor {
};
}
if (context.providerHint === "claude") {
if (withWorkingDirectory.providerHint === "claude") {
return {
...(resolvedConfig.claudeMcpServers
? { claudeMcpServers: cloneMcpConfig(resolvedConfig).claudeMcpServers }
@@ -1163,7 +1307,12 @@ export class PipelineExecutor {
};
const createToolPermissionHandler = (): ActorToolPermissionHandler =>
this.createToolPermissionHandler(executionContext.allowedTools);
this.createToolPermissionHandler({
allowedTools: executionContext.allowedTools,
sessionId: input.sessionId,
nodeId: input.nodeId,
attempt: input.attempt,
});
return {
allowedTools: [...executionContext.allowedTools],
@@ -1175,10 +1324,21 @@ export class PipelineExecutor {
};
}
private createToolPermissionHandler(allowedTools: readonly string[]): ActorToolPermissionHandler {
const allowset = new Set(allowedTools);
private createToolPermissionHandler(input: {
allowedTools: readonly string[];
sessionId: string;
nodeId: string;
attempt: number;
}): ActorToolPermissionHandler {
const allowset = new Set(input.allowedTools);
const caseInsensitiveAllowLookup = buildCaseInsensitiveToolLookup(input.allowedTools);
const rulesEngine = this.securityContext?.rulesEngine;
const toolPolicy = toAllowedToolPolicy(allowedTools);
const toolPolicy = toAllowedToolPolicy(input.allowedTools);
const toolAuditContext = {
sessionId: input.sessionId,
nodeId: input.nodeId,
attempt: input.attempt,
};
return async (toolName, _input, options) => {
const toolUseID = options.toolUseID;
@@ -1192,11 +1352,16 @@ export class PipelineExecutor {
}
const candidates = toToolNameCandidates(toolName);
const allowMatch = candidates.find((candidate) => allowset.has(candidate));
const allowMatch = resolveAllowedToolMatch({
candidates,
allowset,
caseInsensitiveLookup: caseInsensitiveAllowLookup,
});
if (!allowMatch) {
rulesEngine?.assertToolInvocationAllowed({
tool: candidates[0] ?? toolName,
toolClearance: toolPolicy,
context: toolAuditContext,
});
return {
behavior: "deny",
@@ -1209,6 +1374,7 @@ export class PipelineExecutor {
rulesEngine?.assertToolInvocationAllowed({
tool: allowMatch,
toolClearance: toolPolicy,
context: toolAuditContext,
});
return {

View File

@@ -5,12 +5,23 @@ import { deepCloneJson, isRecord, type JsonObject, type JsonValue } from "./type
export const PROJECT_CONTEXT_SCHEMA_VERSION = 1;
export type ProjectTaskStatus = "pending" | "in_progress" | "blocked" | "done";
export type ProjectTaskStatus =
| "pending"
| "in_progress"
| "review"
| "conflict"
| "resolving_conflict"
| "merged"
| "failed"
| "blocked"
| "done";
export type ProjectTask = {
id: string;
title: string;
taskId: string;
id?: string;
title?: string;
status: ProjectTaskStatus;
worktreePath?: string;
assignee?: string;
metadata?: JsonObject;
};
@@ -52,7 +63,17 @@ function toJsonObject(value: unknown, label: string): JsonObject {
}
function toTaskStatus(value: unknown, label: string): ProjectTaskStatus {
if (value === "pending" || value === "in_progress" || value === "blocked" || value === "done") {
if (
value === "pending" ||
value === "in_progress" ||
value === "review" ||
value === "conflict" ||
value === "resolving_conflict" ||
value === "merged" ||
value === "failed" ||
value === "blocked" ||
value === "done"
) {
return value;
}
throw new Error(`${label} has unsupported status "${String(value)}".`);
@@ -68,10 +89,28 @@ function toProjectTask(value: unknown, label: string): ProjectTask {
throw new Error(`${label}.assignee must be a non-empty string when provided.`);
}
const taskIdCandidate = value.taskId ?? value.id;
const taskId = assertNonEmptyString(taskIdCandidate, `${label}.taskId`);
const titleRaw = value.title;
if (titleRaw !== undefined && (typeof titleRaw !== "string" || titleRaw.trim().length === 0)) {
throw new Error(`${label}.title must be a non-empty string when provided.`);
}
const worktreePathRaw = value.worktreePath;
if (
worktreePathRaw !== undefined &&
(typeof worktreePathRaw !== "string" || worktreePathRaw.trim().length === 0)
) {
throw new Error(`${label}.worktreePath must be a non-empty string when provided.`);
}
return {
id: assertNonEmptyString(value.id, `${label}.id`),
title: assertNonEmptyString(value.title, `${label}.title`),
taskId,
id: taskId,
...(typeof titleRaw === "string" ? { title: titleRaw.trim() } : {}),
status: toTaskStatus(value.status, `${label}.status`),
...(typeof worktreePathRaw === "string" ? { worktreePath: worktreePathRaw.trim() } : {}),
...(typeof assignee === "string" ? { assignee: assignee.trim() } : {}),
...(value.metadata !== undefined
? { metadata: toJsonObject(value.metadata, `${label}.metadata`) }
@@ -157,10 +196,10 @@ function mergeUpsertTasks(current: ProjectTask[], upserts: ProjectTask[]): Proje
const byId = new Map<string, ProjectTask>();
for (const task of current) {
byId.set(task.id, task);
byId.set(task.taskId, task);
}
for (const task of upserts) {
byId.set(task.id, task);
byId.set(task.taskId, task);
}
return [...byId.values()];

View File

@@ -197,9 +197,9 @@ export class ResourceProvisioningOrchestrator {
async provisionSession(input: {
sessionId: string;
resources: ResourceRequest[];
workspaceRoot?: string;
workspaceRoot: string;
}): Promise<ProvisionedResources> {
const workspaceRoot = resolve(input.workspaceRoot ?? process.cwd());
const workspaceRoot = resolve(input.workspaceRoot);
const hardConstraints: ProvisionedResourcesState["hardConstraints"] = [];
const releases: ProvisionedResourcesState["releases"] = [];
const env: Record<string, string> = {};

View File

@@ -0,0 +1,783 @@
import { execFile } from "node:child_process";
import { randomUUID } from "node:crypto";
import { mkdir, readFile, readdir, stat } from "node:fs/promises";
import { dirname, isAbsolute, resolve } from "node:path";
import { promisify } from "node:util";
import { withFileLock, writeUtf8FileAtomic } from "./file-persistence.js";
const execFileAsync = promisify(execFile);
const SESSION_METADATA_FILE_NAME = "session-metadata.json";
export type SessionStatus = "active" | "suspended" | "closed" | "closed_with_conflicts";
export type SessionMetadata = {
sessionId: string;
projectPath: string;
sessionStatus: SessionStatus;
baseWorkspacePath: string;
createdAt: string;
updatedAt: string;
};
export type CreateSessionRequest = {
projectPath: string;
};
export type MergeTaskIntoBaseOutcome =
| {
kind: "success";
taskId: string;
worktreePath: string;
baseWorkspacePath: string;
}
| {
kind: "conflict";
taskId: string;
worktreePath: string;
baseWorkspacePath: string;
conflictFiles: string[];
mergeBase?: string;
}
| {
kind: "fatal_error";
taskId: string;
worktreePath: string;
baseWorkspacePath: string;
error: string;
mergeBase?: string;
};
export type CloseSessionOutcome =
| {
kind: "success";
sessionId: string;
mergedToProject: boolean;
}
| {
kind: "conflict";
sessionId: string;
worktreePath: string;
conflictFiles: string[];
mergeBase?: string;
baseBranch?: string;
}
| {
kind: "fatal_error";
sessionId: string;
error: string;
baseBranch?: string;
mergeBase?: string;
};
type GitExecutionResult = {
exitCode: number;
stdout: string;
stderr: string;
};
type GitWorktreeRecord = {
path: string;
branchRef?: string;
};
function toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
function assertAbsolutePath(path: string, label: string): string {
if (!isAbsolute(path)) {
throw new Error(`${label} must be an absolute path.`);
}
return resolve(path);
}
function assertNonEmptyString(value: unknown, label: string): string {
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`${label} must be a non-empty string.`);
}
return value.trim();
}
function toSessionStatus(value: unknown): SessionStatus {
if (
value === "active" ||
value === "suspended" ||
value === "closed" ||
value === "closed_with_conflicts"
) {
return value;
}
throw new Error(`Session status "${String(value)}" is not supported.`);
}
function toSessionMetadata(value: unknown): SessionMetadata {
if (!value || typeof value !== "object" || Array.isArray(value)) {
throw new Error("Session metadata file is malformed.");
}
const raw = value as Record<string, unknown>;
return {
sessionId: assertNonEmptyString(raw.sessionId, "sessionId"),
projectPath: assertAbsolutePath(assertNonEmptyString(raw.projectPath, "projectPath"), "projectPath"),
baseWorkspacePath: assertAbsolutePath(
assertNonEmptyString(raw.baseWorkspacePath, "baseWorkspacePath"),
"baseWorkspacePath",
),
sessionStatus: toSessionStatus(raw.sessionStatus),
createdAt: assertNonEmptyString(raw.createdAt, "createdAt"),
updatedAt: assertNonEmptyString(raw.updatedAt, "updatedAt"),
};
}
async function runGit(args: string[]): Promise<string> {
const result = await runGitWithResult(args);
if (result.exitCode !== 0) {
throw new Error(`git ${args.join(" ")} failed: ${result.stderr || result.stdout || "unknown git error"}`);
}
return result.stdout.trim();
}
async function runGitWithResult(args: string[]): Promise<GitExecutionResult> {
try {
const { stdout, stderr } = await execFileAsync("git", args, {
encoding: "utf8",
});
return {
exitCode: 0,
stdout: stdout.trim(),
stderr: stderr.trim(),
};
} catch (error) {
const failure = error as {
code?: number | string;
stdout?: string;
stderr?: string;
};
if (typeof failure.code === "number") {
return {
exitCode: failure.code,
stdout: String(failure.stdout ?? "").trim(),
stderr: String(failure.stderr ?? "").trim(),
};
}
throw new Error(`git ${args.join(" ")} failed: ${toErrorMessage(error)}`);
}
}
async function pathExists(path: string): Promise<boolean> {
try {
await stat(path);
return true;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return false;
}
throw error;
}
}
function sanitizeSegment(value: string, fallback: string): string {
const normalized = value
.trim()
.replace(/[^a-zA-Z0-9_-]/g, "-")
.replace(/-+/g, "-")
.replace(/^-+/, "")
.replace(/-+$/, "");
return normalized || fallback;
}
function toGitFailureMessage(result: GitExecutionResult): string {
const details = result.stderr || result.stdout || "unknown git error";
return `git command failed with exit code ${String(result.exitCode)}: ${details}`;
}
function toStringLines(value: string): string[] {
return value
.split("\n")
.map((line) => line.trim())
.filter((line) => line.length > 0);
}
function parseGitWorktreeRecords(value: string): GitWorktreeRecord[] {
const lines = value.split("\n");
const records: GitWorktreeRecord[] = [];
let current: GitWorktreeRecord | undefined;
for (const line of lines) {
if (!line.trim()) {
if (current) {
records.push(current);
current = undefined;
}
continue;
}
if (line.startsWith("worktree ")) {
if (current) {
records.push(current);
}
current = {
path: line.slice("worktree ".length).trim(),
};
continue;
}
if (line.startsWith("branch ") && current) {
current.branchRef = line.slice("branch ".length).trim();
}
}
if (current) {
records.push(current);
}
return records;
}
export class FileSystemSessionMetadataStore {
private readonly stateRoot: string;
constructor(input: { stateRoot: string }) {
this.stateRoot = resolve(input.stateRoot);
}
getStateRoot(): string {
return this.stateRoot;
}
getSessionDirectory(sessionId: string): string {
return resolve(this.stateRoot, sessionId);
}
getSessionMetadataPath(sessionId: string): string {
return resolve(this.getSessionDirectory(sessionId), SESSION_METADATA_FILE_NAME);
}
getSessionProjectContextPath(sessionId: string): string {
return resolve(this.getSessionDirectory(sessionId), "project-context.json");
}
async createSession(input: {
projectPath: string;
baseWorkspacePath: string;
sessionId?: string;
}): Promise<SessionMetadata> {
const sessionId = input.sessionId?.trim() || randomUUID();
const now = new Date().toISOString();
const metadata: SessionMetadata = {
sessionId,
projectPath: assertAbsolutePath(input.projectPath, "projectPath"),
baseWorkspacePath: assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath"),
sessionStatus: "active",
createdAt: now,
updatedAt: now,
};
const sessionDirectory = this.getSessionDirectory(sessionId);
await mkdir(sessionDirectory, { recursive: true });
await this.writeSessionMetadata(metadata);
return metadata;
}
async readSession(sessionId: string): Promise<SessionMetadata | undefined> {
const metadataPath = this.getSessionMetadataPath(sessionId);
try {
const content = await readFile(metadataPath, "utf8");
return toSessionMetadata(JSON.parse(content) as unknown);
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return undefined;
}
throw error;
}
}
async listSessions(): Promise<SessionMetadata[]> {
try {
const entries = await readdir(this.stateRoot, { withFileTypes: true });
const sessions: SessionMetadata[] = [];
for (const entry of entries) {
if (!entry.isDirectory()) {
continue;
}
const metadata = await this.readSession(entry.name);
if (metadata) {
sessions.push(metadata);
}
}
sessions.sort((left, right) => right.createdAt.localeCompare(left.createdAt));
return sessions;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return [];
}
throw error;
}
}
async updateSession(
sessionId: string,
patch: Partial<Pick<SessionMetadata, "projectPath" | "baseWorkspacePath" | "sessionStatus">>,
): Promise<SessionMetadata> {
const current = await this.readSession(sessionId);
if (!current) {
throw new Error(`Session "${sessionId}" does not exist.`);
}
const next: SessionMetadata = {
...current,
...(patch.projectPath ? { projectPath: assertAbsolutePath(patch.projectPath, "projectPath") } : {}),
...(patch.baseWorkspacePath
? { baseWorkspacePath: assertAbsolutePath(patch.baseWorkspacePath, "baseWorkspacePath") }
: {}),
...(patch.sessionStatus ? { sessionStatus: patch.sessionStatus } : {}),
updatedAt: new Date().toISOString(),
};
await this.writeSessionMetadata(next);
return next;
}
private async writeSessionMetadata(metadata: SessionMetadata): Promise<void> {
const metadataPath = this.getSessionMetadataPath(metadata.sessionId);
await mkdir(dirname(metadataPath), { recursive: true });
await withFileLock(`${metadataPath}.lock`, async () => {
await writeUtf8FileAtomic(metadataPath, `${JSON.stringify(metadata, null, 2)}\n`);
});
}
}
export class SessionWorktreeManager {
private readonly worktreeRoot: string;
private readonly baseRef: string;
constructor(input: {
worktreeRoot: string;
baseRef: string;
}) {
this.worktreeRoot = assertAbsolutePath(input.worktreeRoot, "worktreeRoot");
this.baseRef = assertNonEmptyString(input.baseRef, "baseRef");
}
resolveBaseWorkspacePath(sessionId: string): string {
const scoped = sanitizeSegment(sessionId, "session");
return resolve(this.worktreeRoot, scoped, "base");
}
resolveTaskWorktreePath(sessionId: string, taskId: string): string {
const scopedSession = sanitizeSegment(sessionId, "session");
const scopedTask = sanitizeSegment(taskId, "task");
return resolve(this.worktreeRoot, scopedSession, "tasks", scopedTask);
}
private resolveBaseBranchName(sessionId: string): string {
const scoped = sanitizeSegment(sessionId, "session");
return `ai-ops/${scoped}/base`;
}
private resolveTaskBranchName(sessionId: string, taskId: string): string {
const scopedSession = sanitizeSegment(sessionId, "session");
const scopedTask = sanitizeSegment(taskId, "task");
return `ai-ops/${scopedSession}/task/${scopedTask}`;
}
async initializeSessionBaseWorkspace(input: {
sessionId: string;
projectPath: string;
baseWorkspacePath: string;
}): Promise<void> {
const projectPath = assertAbsolutePath(input.projectPath, "projectPath");
const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath");
await mkdir(dirname(baseWorkspacePath), { recursive: true });
const alreadyExists = await pathExists(baseWorkspacePath);
if (alreadyExists) {
return;
}
const repoRoot = await runGit(["-C", projectPath, "rev-parse", "--show-toplevel"]);
const branchName = this.resolveBaseBranchName(input.sessionId);
await runGit(["-C", repoRoot, "worktree", "add", "-B", branchName, baseWorkspacePath, this.baseRef]);
}
async ensureTaskWorktree(input: {
sessionId: string;
taskId: string;
baseWorkspacePath: string;
existingWorktreePath?: string;
}): Promise<{
taskWorktreePath: string;
}> {
const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath");
const maybeExisting = input.existingWorktreePath?.trim();
const worktreePath = maybeExisting
? assertAbsolutePath(maybeExisting, "existingWorktreePath")
: this.resolveTaskWorktreePath(input.sessionId, input.taskId);
const branchName = this.resolveTaskBranchName(input.sessionId, input.taskId);
const attachedWorktree = await this.findWorktreePathForBranch(baseWorkspacePath, branchName);
if (attachedWorktree && attachedWorktree !== worktreePath) {
throw new Error(
`Task branch "${branchName}" is already attached to worktree "${attachedWorktree}", ` +
`expected "${worktreePath}".`,
);
}
if (!(await pathExists(worktreePath))) {
await runGit(["-C", baseWorkspacePath, "worktree", "prune", "--expire", "now"]);
}
if (!(await pathExists(worktreePath))) {
await mkdir(dirname(worktreePath), { recursive: true });
const addResult = await runGitWithResult([
"-C",
baseWorkspacePath,
"worktree",
"add",
"-B",
branchName,
worktreePath,
"HEAD",
]);
if (addResult.exitCode !== 0) {
const attachedAfterFailure = await this.findWorktreePathForBranch(baseWorkspacePath, branchName);
if (attachedAfterFailure === worktreePath && (await pathExists(worktreePath))) {
return {
taskWorktreePath: worktreePath,
};
}
throw new Error(
`git -C ${baseWorkspacePath} worktree add -B ${branchName} ${worktreePath} HEAD failed: ` +
`${toGitFailureMessage(addResult)}`,
);
}
}
return {
taskWorktreePath: worktreePath,
};
}
async mergeTaskIntoBase(input: {
taskId: string;
baseWorkspacePath: string;
taskWorktreePath: string;
}): Promise<MergeTaskIntoBaseOutcome> {
const baseWorkspacePath = assertAbsolutePath(input.baseWorkspacePath, "baseWorkspacePath");
const taskWorktreePath = assertAbsolutePath(input.taskWorktreePath, "taskWorktreePath");
const taskId = input.taskId;
if (!(await pathExists(baseWorkspacePath))) {
throw new Error(`Base workspace "${baseWorkspacePath}" does not exist.`);
}
if (!(await pathExists(taskWorktreePath))) {
throw new Error(`Task worktree "${taskWorktreePath}" does not exist.`);
}
let mergeBase: string | undefined;
try {
await runGit(["-C", taskWorktreePath, "add", "-A"]);
const hasPending = await this.hasStagedChanges(taskWorktreePath);
if (hasPending) {
await runGit(["-C", taskWorktreePath, "commit", "-m", `ai_ops: finalize task ${taskId}`]);
}
const branchName = await runGit(["-C", taskWorktreePath, "rev-parse", "--abbrev-ref", "HEAD"]);
const baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]);
mergeBase = await this.tryReadMergeBase(baseWorkspacePath, baseBranch, branchName);
if (await this.hasOngoingMerge(taskWorktreePath)) {
return {
kind: "conflict",
taskId,
worktreePath: taskWorktreePath,
baseWorkspacePath,
conflictFiles: await this.readConflictFiles(taskWorktreePath),
...(mergeBase ? { mergeBase } : {}),
};
}
const syncTaskBranch = await runGitWithResult([
"-C",
taskWorktreePath,
"merge",
"--no-ff",
"--no-edit",
baseBranch,
]);
if (syncTaskBranch.exitCode === 1) {
return {
kind: "conflict",
taskId,
worktreePath: taskWorktreePath,
baseWorkspacePath,
conflictFiles: await this.readConflictFiles(taskWorktreePath),
...(mergeBase ? { mergeBase } : {}),
};
}
if (syncTaskBranch.exitCode !== 0) {
return {
kind: "fatal_error",
taskId,
worktreePath: taskWorktreePath,
baseWorkspacePath,
error: toGitFailureMessage(syncTaskBranch),
...(mergeBase ? { mergeBase } : {}),
};
}
if (await this.hasOngoingMerge(baseWorkspacePath)) {
return {
kind: "conflict",
taskId,
worktreePath: baseWorkspacePath,
baseWorkspacePath,
conflictFiles: await this.readConflictFiles(baseWorkspacePath),
...(mergeBase ? { mergeBase } : {}),
};
}
const mergeIntoBase = await runGitWithResult([
"-C",
baseWorkspacePath,
"merge",
"--no-ff",
"--no-edit",
branchName,
]);
if (mergeIntoBase.exitCode === 1) {
return {
kind: "conflict",
taskId,
worktreePath: baseWorkspacePath,
baseWorkspacePath,
conflictFiles: await this.readConflictFiles(baseWorkspacePath),
...(mergeBase ? { mergeBase } : {}),
};
}
if (mergeIntoBase.exitCode !== 0) {
return {
kind: "fatal_error",
taskId,
worktreePath: taskWorktreePath,
baseWorkspacePath,
error: toGitFailureMessage(mergeIntoBase),
...(mergeBase ? { mergeBase } : {}),
};
}
await this.removeWorktree({
repoPath: baseWorkspacePath,
worktreePath: taskWorktreePath,
});
return {
kind: "success",
taskId,
worktreePath: taskWorktreePath,
baseWorkspacePath,
};
} catch (error) {
return {
kind: "fatal_error",
taskId,
worktreePath: taskWorktreePath,
baseWorkspacePath,
error: toErrorMessage(error),
...(mergeBase ? { mergeBase } : {}),
};
}
}
async closeSession(input: {
session: SessionMetadata;
taskWorktreePaths: string[];
mergeBaseIntoProject?: boolean;
}): Promise<CloseSessionOutcome> {
const projectPath = assertAbsolutePath(input.session.projectPath, "projectPath");
const baseWorkspacePath = assertAbsolutePath(input.session.baseWorkspacePath, "baseWorkspacePath");
if (!(await pathExists(projectPath))) {
throw new Error(`Project path "${projectPath}" does not exist.`);
}
if (!(await pathExists(baseWorkspacePath))) {
throw new Error(`Base workspace "${baseWorkspacePath}" does not exist.`);
}
let baseBranch: string | undefined;
let mergeBase: string | undefined;
try {
for (const taskWorktreePath of input.taskWorktreePaths) {
if (!taskWorktreePath.trim()) {
continue;
}
await this.removeWorktree({
repoPath: baseWorkspacePath,
worktreePath: taskWorktreePath,
});
}
if (input.mergeBaseIntoProject) {
baseBranch = await runGit(["-C", baseWorkspacePath, "rev-parse", "--abbrev-ref", "HEAD"]);
mergeBase = await this.tryReadMergeBase(projectPath, "HEAD", baseBranch);
if (await this.hasOngoingMerge(projectPath)) {
return {
kind: "conflict",
sessionId: input.session.sessionId,
worktreePath: projectPath,
conflictFiles: await this.readConflictFiles(projectPath),
...(baseBranch ? { baseBranch } : {}),
...(mergeBase ? { mergeBase } : {}),
};
}
const mergeResult = await runGitWithResult([
"-C",
projectPath,
"merge",
"--no-ff",
"--no-edit",
baseBranch,
]);
if (mergeResult.exitCode === 1) {
return {
kind: "conflict",
sessionId: input.session.sessionId,
worktreePath: projectPath,
conflictFiles: await this.readConflictFiles(projectPath),
...(baseBranch ? { baseBranch } : {}),
...(mergeBase ? { mergeBase } : {}),
};
}
if (mergeResult.exitCode !== 0) {
return {
kind: "fatal_error",
sessionId: input.session.sessionId,
error: toGitFailureMessage(mergeResult),
...(baseBranch ? { baseBranch } : {}),
...(mergeBase ? { mergeBase } : {}),
};
}
}
await this.removeWorktree({
repoPath: projectPath,
worktreePath: baseWorkspacePath,
});
return {
kind: "success",
sessionId: input.session.sessionId,
mergedToProject: input.mergeBaseIntoProject === true,
};
} catch (error) {
return {
kind: "fatal_error",
sessionId: input.session.sessionId,
error: toErrorMessage(error),
...(baseBranch ? { baseBranch } : {}),
...(mergeBase ? { mergeBase } : {}),
};
}
}
private async removeWorktree(input: {
repoPath: string;
worktreePath: string;
}): Promise<void> {
if (!(await pathExists(input.worktreePath))) {
return;
}
await runGit(["-C", input.repoPath, "worktree", "remove", "--force", input.worktreePath]);
await runGit(["-C", input.repoPath, "worktree", "prune"]);
}
private async hasStagedChanges(worktreePath: string): Promise<boolean> {
try {
await execFileAsync("git", ["-C", worktreePath, "diff", "--cached", "--quiet"], {
encoding: "utf8",
});
return false;
} catch (error) {
const exitCode = (error as { code?: number }).code;
if (exitCode === 1) {
return true;
}
throw new Error(`Unable to inspect staged changes: ${toErrorMessage(error)}`);
}
}
private async hasOngoingMerge(worktreePath: string): Promise<boolean> {
const result = await runGitWithResult([
"-C",
worktreePath,
"rev-parse",
"-q",
"--verify",
"MERGE_HEAD",
]);
return result.exitCode === 0;
}
private async readConflictFiles(worktreePath: string): Promise<string[]> {
const result = await runGitWithResult([
"-C",
worktreePath,
"diff",
"--name-only",
"--diff-filter=U",
]);
if (result.exitCode !== 0) {
return [];
}
return toStringLines(result.stdout);
}
private async tryReadMergeBase(
repoPath: string,
leftRef: string,
rightRef: string,
): Promise<string | undefined> {
const result = await runGitWithResult(["-C", repoPath, "merge-base", leftRef, rightRef]);
if (result.exitCode !== 0) {
return undefined;
}
const mergeBase = result.stdout.trim();
return mergeBase || undefined;
}
private async findWorktreePathForBranch(
repoPath: string,
branchName: string,
): Promise<string | undefined> {
const branchRef = `refs/heads/${branchName}`;
const records = await this.listWorktreeRecords(repoPath);
const matched = records.find((record) => record.branchRef === branchRef);
if (!matched) {
return undefined;
}
return resolve(matched.path);
}
private async listWorktreeRecords(repoPath: string): Promise<GitWorktreeRecord[]> {
const result = await runGitWithResult(["-C", repoPath, "worktree", "list", "--porcelain"]);
if (result.exitCode !== 0) {
return [];
}
return parseGitWorktreeRecords(result.stdout);
}
}

View File

@@ -16,9 +16,21 @@ export type ProviderRuntimeConfig = {
anthropicApiKey?: string;
claudeModel?: string;
claudeCodePath?: string;
claudeObservability: ClaudeObservabilityRuntimeConfig;
};
export type OpenAiAuthMode = "auto" | "chatgpt" | "api_key";
export type ClaudeObservabilityMode = "off" | "stdout" | "file" | "both";
export type ClaudeObservabilityVerbosity = "summary" | "full";
export type ClaudeObservabilityRuntimeConfig = {
mode: ClaudeObservabilityMode;
verbosity: ClaudeObservabilityVerbosity;
logPath: string;
includePartialMessages: boolean;
debug: boolean;
debugLogPath?: string;
};
export type McpRuntimeConfig = {
configPath: string;
@@ -30,6 +42,7 @@ export type OrchestrationRuntimeConfig = {
maxDepth: number;
maxRetries: number;
maxChildren: number;
mergeConflictMaxAttempts: number;
};
export type DiscoveryRuntimeConfig = {
@@ -77,6 +90,7 @@ const DEFAULT_ORCHESTRATION: OrchestrationRuntimeConfig = {
maxDepth: 4,
maxRetries: 2,
maxChildren: 4,
mergeConflictMaxAttempts: 2,
};
const DEFAULT_PROVISIONING: BuiltInProvisioningConfig = {
@@ -113,6 +127,15 @@ const DEFAULT_RUNTIME_EVENTS: RuntimeEventRuntimeConfig = {
discordAlwaysNotifyTypes: ["session.started", "session.completed", "session.failed"],
};
const DEFAULT_CLAUDE_OBSERVABILITY: ClaudeObservabilityRuntimeConfig = {
mode: "off",
verbosity: "summary",
logPath: ".ai_ops/events/claude-trace.ndjson",
includePartialMessages: false,
debug: false,
debugLogPath: undefined,
};
function readOptionalString(
env: NodeJS.ProcessEnv,
key: string,
@@ -272,6 +295,26 @@ function parseOpenAiAuthMode(raw: string): OpenAiAuthMode {
);
}
function parseClaudeObservabilityMode(raw: string): ClaudeObservabilityMode {
if (raw === "off" || raw === "stdout" || raw === "file" || raw === "both") {
return raw;
}
throw new Error(
'Environment variable CLAUDE_OBSERVABILITY_MODE must be one of: "off", "stdout", "file", "both".',
);
}
function parseClaudeObservabilityVerbosity(raw: string): ClaudeObservabilityVerbosity {
if (raw === "summary" || raw === "full") {
return raw;
}
throw new Error(
'Environment variable CLAUDE_OBSERVABILITY_VERBOSITY must be one of: "summary", "full".',
);
}
function deepFreeze<T>(value: T): Readonly<T> {
if (value === null || typeof value !== "object") {
return value;
@@ -358,6 +401,38 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly<AppCo
anthropicApiKey,
claudeModel: normalizeClaudeModel(readOptionalString(env, "CLAUDE_MODEL")),
claudeCodePath: readOptionalString(env, "CLAUDE_CODE_PATH"),
claudeObservability: {
mode: parseClaudeObservabilityMode(
readStringWithFallback(
env,
"CLAUDE_OBSERVABILITY_MODE",
DEFAULT_CLAUDE_OBSERVABILITY.mode,
),
),
verbosity: parseClaudeObservabilityVerbosity(
readStringWithFallback(
env,
"CLAUDE_OBSERVABILITY_VERBOSITY",
DEFAULT_CLAUDE_OBSERVABILITY.verbosity,
),
),
logPath: readStringWithFallback(
env,
"CLAUDE_OBSERVABILITY_LOG_PATH",
DEFAULT_CLAUDE_OBSERVABILITY.logPath,
),
includePartialMessages: readBooleanWithFallback(
env,
"CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL",
DEFAULT_CLAUDE_OBSERVABILITY.includePartialMessages,
),
debug: readBooleanWithFallback(
env,
"CLAUDE_OBSERVABILITY_DEBUG",
DEFAULT_CLAUDE_OBSERVABILITY.debug,
),
debugLogPath: readOptionalString(env, "CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH"),
},
},
mcp: {
configPath: readStringWithFallback(env, "MCP_CONFIG_PATH", "./mcp.config.json"),
@@ -411,6 +486,12 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Readonly<AppCo
DEFAULT_ORCHESTRATION.maxChildren,
{ min: 1 },
),
mergeConflictMaxAttempts: readIntegerWithBounds(
env,
"AGENT_MERGE_CONFLICT_MAX_ATTEMPTS",
DEFAULT_ORCHESTRATION.mergeConflictMaxAttempts,
{ min: 1 },
),
},
provisioning: {
gitWorktree: {

View File

@@ -85,6 +85,7 @@ export async function runClaudePrompt(
const writeOutput = dependencies.writeOutput ?? ((output: string) => console.log(output));
const sessionContext = await createSessionContextFn("claude", {
prompt,
workspaceRoot: process.cwd(),
config,
});

View File

@@ -48,6 +48,7 @@ export async function runCodexPrompt(
const writeOutput = dependencies.writeOutput ?? ((output: string) => console.log(output));
const sessionContext = await createSessionContextFn("codex", {
prompt,
workspaceRoot: process.cwd(),
config,
});

View File

@@ -28,6 +28,7 @@ export async function createSessionContext(
provider: SessionProvider,
input: {
prompt: string;
workspaceRoot: string;
config?: Readonly<AppConfig>;
mcpRegistry?: McpRegistry;
},
@@ -58,6 +59,7 @@ export async function createSessionContext(
provisionedResources = await resourceProvisioning.provisionSession({
sessionId: agentSession.id,
resources: [{ kind: "git-worktree" }, { kind: "port-range" }],
workspaceRoot: input.workspaceRoot,
});
const providerAuthEnv =
@@ -82,6 +84,7 @@ export async function createSessionContext(
{
providerHint: provider,
prompt: input.prompt,
workingDirectory: runtimeInjection.workingDirectory,
},
{
config,

View File

@@ -1,5 +1,5 @@
import { existsSync, readFileSync } from "node:fs";
import { resolve } from "node:path";
import { isAbsolute, resolve } from "node:path";
import type { CodexOptions } from "@openai/codex-sdk";
import { getConfig, type AppConfig } from "./config.js";
import { normalizeSharedMcpConfigFile } from "./mcp/converters.js";
@@ -23,12 +23,17 @@ import type {
import { parseMcpConfig } from "./mcp/types.js";
import type { ToolClearancePolicy } from "./security/schemas.js";
function readConfigFile(configPath: string): {
function readConfigFile(input: {
configPath: string;
workingDirectory?: string;
}): {
config?: SharedMcpConfigFile;
sourcePath?: string;
} {
const candidatePath = configPath.trim() || "./mcp.config.json";
const resolvedPath = resolve(process.cwd(), candidatePath);
const candidatePath = input.configPath.trim() || "./mcp.config.json";
const resolvedPath = isAbsolute(candidatePath)
? candidatePath
: resolve(input.workingDirectory ?? process.cwd(), candidatePath);
if (!existsSync(resolvedPath)) {
if (candidatePath !== "./mcp.config.json") {
@@ -83,7 +88,10 @@ export function loadMcpConfigFromEnv(
const registry = options?.registry ?? defaultMcpRegistry;
const warn = options?.warn ?? ((message: string) => console.warn(message));
const { config, sourcePath } = readConfigFile(runtimeConfig.mcp.configPath);
const { config, sourcePath } = readConfigFile({
configPath: runtimeConfig.mcp.configPath,
workingDirectory: context.workingDirectory,
});
if (!config) {
return {};
}

View File

@@ -50,6 +50,7 @@ export type SharedMcpConfigFile = {
export type McpLoadContext = {
providerHint?: "codex" | "claude" | "both";
prompt?: string;
workingDirectory?: string;
};
export type LoadedMcpConfig = {

View File

@@ -13,41 +13,43 @@ import {
} from "./schemas.js";
export type SecurityAuditEvent =
| {
| ({
type: "shell.command_profiled";
timestamp: string;
command: string;
cwd: string;
parsed: ParsedShellScript;
}
| {
} & SecurityAuditContext)
| ({
type: "shell.command_allowed";
timestamp: string;
command: string;
cwd: string;
commandCount: number;
}
| {
} & SecurityAuditContext)
| ({
type: "shell.command_blocked";
timestamp: string;
command: string;
cwd: string;
reason: string;
code: string;
details?: Record<string, unknown>;
}
| {
} & SecurityAuditContext)
| ({
type: "tool.invocation_allowed";
timestamp: string;
tool: string;
}
| {
} & SecurityAuditContext)
| ({
type: "tool.invocation_blocked";
timestamp: string;
tool: string;
reason: string;
code: string;
};
} & SecurityAuditContext);
export type SecurityAuditContext = {
timestamp: string;
sessionId?: string;
nodeId?: string;
attempt?: number;
};
export type SecurityAuditSink = (event: SecurityAuditEvent) => void;
@@ -102,6 +104,28 @@ function toNow(): string {
return new Date().toISOString();
}
function toAuditContext(input?: {
sessionId?: string;
nodeId?: string;
attempt?: number;
}): SecurityAuditContext {
const output: SecurityAuditContext = {
timestamp: toNow(),
};
if (input?.sessionId) {
output.sessionId = input.sessionId;
}
if (input?.nodeId) {
output.nodeId = input.nodeId;
}
if (typeof input?.attempt === "number" && Number.isInteger(input.attempt) && input.attempt >= 1) {
output.attempt = input.attempt;
}
return output;
}
export class SecurityRulesEngine {
private readonly policy: ShellValidationPolicy;
private readonly allowedBinaries: Set<string>;
@@ -136,6 +160,11 @@ export class SecurityRulesEngine {
command: string;
cwd: string;
toolClearance?: ToolClearancePolicy;
context?: {
sessionId?: string;
nodeId?: string;
attempt?: number;
};
}): Promise<ValidatedShellCommand> {
const resolvedCwd = resolve(input.cwd);
@@ -147,22 +176,22 @@ export class SecurityRulesEngine {
: undefined;
this.emit({
...toAuditContext(input.context),
type: "shell.command_profiled",
timestamp: toNow(),
command: input.command,
cwd: resolvedCwd,
parsed,
});
for (const command of parsed.commands) {
this.assertBinaryAllowed(command, toolClearance);
this.assertBinaryAllowed(command, toolClearance, input.context);
this.assertAssignmentsAllowed(command);
this.assertArgumentPaths(command, resolvedCwd);
}
this.emit({
...toAuditContext(input.context),
type: "shell.command_allowed",
timestamp: toNow(),
command: input.command,
cwd: resolvedCwd,
commandCount: parsed.commandCount,
@@ -175,8 +204,8 @@ export class SecurityRulesEngine {
} catch (error) {
if (error instanceof SecurityViolationError) {
this.emit({
...toAuditContext(input.context),
type: "shell.command_blocked",
timestamp: toNow(),
command: input.command,
cwd: resolvedCwd,
reason: error.message,
@@ -196,13 +225,18 @@ export class SecurityRulesEngine {
assertToolInvocationAllowed(input: {
tool: string;
toolClearance: ToolClearancePolicy;
context?: {
sessionId?: string;
nodeId?: string;
attempt?: number;
};
}): void {
const policy = parseToolClearancePolicy(input.toolClearance);
if (policy.banlist.includes(input.tool)) {
this.emit({
...toAuditContext(input.context),
type: "tool.invocation_blocked",
timestamp: toNow(),
tool: input.tool,
reason: `Tool "${input.tool}" is explicitly banned by policy.`,
code: "TOOL_BANNED",
@@ -220,8 +254,8 @@ export class SecurityRulesEngine {
if (policy.allowlist.length > 0 && !policy.allowlist.includes(input.tool)) {
this.emit({
...toAuditContext(input.context),
type: "tool.invocation_blocked",
timestamp: toNow(),
tool: input.tool,
reason: `Tool "${input.tool}" is not present in allowlist.`,
code: "TOOL_NOT_ALLOWED",
@@ -238,8 +272,8 @@ export class SecurityRulesEngine {
}
this.emit({
...toAuditContext(input.context),
type: "tool.invocation_allowed",
timestamp: toNow(),
tool: input.tool,
});
}
@@ -290,6 +324,11 @@ export class SecurityRulesEngine {
private assertBinaryAllowed(
command: ParsedShellCommand,
toolClearance?: ToolClearancePolicy,
context?: {
sessionId?: string;
nodeId?: string;
attempt?: number;
},
): void {
const binaryToken = normalizeToken(command.binary);
const binaryName = basename(binaryToken);
@@ -313,6 +352,7 @@ export class SecurityRulesEngine {
this.assertToolInvocationAllowed({
tool: binaryName,
toolClearance,
context,
});
}

View File

@@ -0,0 +1,821 @@
import { randomUUID } from "node:crypto";
import { appendFile, mkdir } from "node:fs/promises";
import { dirname, resolve } from "node:path";
import type { Options, SDKMessage } from "@anthropic-ai/claude-agent-sdk";
import type {
ClaudeObservabilityMode,
ClaudeObservabilityRuntimeConfig,
ClaudeObservabilityVerbosity,
} from "../config.js";
import type { JsonObject, JsonValue } from "../agents/types.js";
const MAX_STRING_LENGTH = 320;
const MAX_ARRAY_ITEMS = 20;
const MAX_OBJECT_KEYS = 60;
const MAX_DEPTH = 6;
const NON_SECRET_TOKEN_KEYS = new Set([
"input_tokens",
"output_tokens",
"total_tokens",
"cache_creation_input_tokens",
"cache_read_input_tokens",
"ephemeral_1h_input_tokens",
"ephemeral_5m_input_tokens",
"token_input",
"token_output",
"token_total",
"tokencount",
"token_count",
"tool_use_id",
"parent_tool_use_id",
"task_id",
"session_id",
]);
type ClaudeTraceContext = {
sessionId: string;
nodeId: string;
attempt: number;
depth: number;
};
type ClaudeTraceRecord = {
id: string;
timestamp: string;
source: "claude_sdk";
stage:
| "query.started"
| "query.message"
| "query.stderr"
| "query.completed"
| "query.error";
message: string;
sessionId: string;
nodeId: string;
attempt: number;
depth: number;
sdkSessionId?: string;
sdkMessageType?: string;
sdkMessageSubtype?: string;
data?: JsonObject;
};
function truncate(value: string, maxLength = MAX_STRING_LENGTH): string {
if (value.length <= maxLength) {
return value;
}
return `${value.slice(0, maxLength)}...`;
}
function isSensitiveKey(key: string): boolean {
const normalized = key.trim().toLowerCase();
if (!normalized) {
return false;
}
if (NON_SECRET_TOKEN_KEYS.has(normalized)) {
return false;
}
if (/(api[_-]?key|secret|password|authorization|cookie)/i.test(key)) {
return true;
}
if (/(auth[_-]?token|access[_-]?token|refresh[_-]?token|id[_-]?token|oauth)/i.test(key)) {
return true;
}
return normalized === "token";
}
function toJsonPrimitive(value: unknown): JsonValue {
if (value === null) {
return null;
}
if (typeof value === "string") {
return truncate(value);
}
if (typeof value === "number") {
return Number.isFinite(value) ? value : String(value);
}
if (typeof value === "boolean") {
return value;
}
if (typeof value === "bigint") {
return String(value);
}
if (typeof value === "undefined") {
return null;
}
return truncate(String(value));
}
function sanitizeJsonValue(value: unknown, depth = 0): JsonValue {
if (depth >= MAX_DEPTH) {
return "[depth_limit]";
}
if (
value === null ||
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean" ||
typeof value === "bigint" ||
typeof value === "undefined"
) {
return toJsonPrimitive(value);
}
if (Array.isArray(value)) {
const output = value.slice(0, MAX_ARRAY_ITEMS).map((entry) => sanitizeJsonValue(entry, depth + 1));
if (value.length > MAX_ARRAY_ITEMS) {
output.push(`[+${String(value.length - MAX_ARRAY_ITEMS)} more]`);
}
return output;
}
if (typeof value === "object") {
const output: JsonObject = {};
const entries = Object.entries(value as Record<string, unknown>);
const limited = entries.slice(0, MAX_OBJECT_KEYS);
for (const [key, entryValue] of limited) {
if (isSensitiveKey(key)) {
output[key] = "[redacted]";
continue;
}
output[key] = sanitizeJsonValue(entryValue, depth + 1);
}
if (entries.length > MAX_OBJECT_KEYS) {
output.__truncated_keys = entries.length - MAX_OBJECT_KEYS;
}
return output;
}
return truncate(String(value));
}
function readString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
function readNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function readBoolean(value: unknown): boolean | undefined {
return typeof value === "boolean" ? value : undefined;
}
function toMessageRecord(message: SDKMessage): Record<string, unknown> {
return message as unknown as Record<string, unknown>;
}
function toMessageSubtype(message: SDKMessage): string | undefined {
return readString(toMessageRecord(message).subtype);
}
function toMessageSessionId(message: SDKMessage): string | undefined {
return readString(toMessageRecord(message).session_id);
}
function toTaskNotificationSummary(message: SDKMessage): {
summary: string;
data?: JsonObject;
} {
const raw = toMessageRecord(message);
const status = readString(raw.status) ?? "unknown";
const data: JsonObject = {
status,
};
const taskId = readString(raw.task_id);
if (taskId) {
data.taskId = taskId;
}
const summaryText = readString(raw.summary);
if (summaryText) {
data.summary = truncate(summaryText);
}
const outputFile = readString(raw.output_file);
if (outputFile) {
data.outputFile = outputFile;
}
if (raw.usage !== undefined) {
data.usage = sanitizeJsonValue(raw.usage);
}
return {
summary: `Task notification: ${status}.`,
data,
};
}
function toTaskStartedSummary(message: SDKMessage): {
summary: string;
data?: JsonObject;
} {
const raw = toMessageRecord(message);
const data: JsonObject = {};
const taskId = readString(raw.task_id);
if (taskId) {
data.taskId = taskId;
}
const description = readString(raw.description);
if (description) {
data.description = truncate(description);
}
const taskType = readString(raw.task_type);
if (taskType) {
data.taskType = taskType;
}
const toolUseId = readString(raw.tool_use_id);
if (toolUseId) {
data.toolUseId = toolUseId;
}
return {
summary: "Task started.",
...(Object.keys(data).length > 0 ? { data } : {}),
};
}
function toMessageSummary(message: SDKMessage): {
summary: string;
data?: JsonObject;
} {
const subtype = toMessageSubtype(message);
const raw = toMessageRecord(message);
if (message.type === "result") {
if (message.subtype === "success") {
return {
summary: "Claude query result success.",
data: {
stopReason: message.stop_reason ?? null,
numTurns: message.num_turns,
usage: sanitizeJsonValue(message.usage) as JsonObject,
totalCostUsd: message.total_cost_usd,
},
};
}
return {
summary: `Claude query result ${message.subtype}.`,
data: {
stopReason: message.stop_reason ?? null,
numTurns: message.num_turns,
usage: sanitizeJsonValue(message.usage) as JsonObject,
totalCostUsd: message.total_cost_usd,
errors: sanitizeJsonValue(message.errors),
},
};
}
if (message.type === "tool_progress") {
return {
summary: `Tool progress: ${message.tool_name}.`,
data: {
toolName: message.tool_name,
toolUseId: message.tool_use_id,
elapsedTimeSeconds: message.elapsed_time_seconds,
parentToolUseId: message.parent_tool_use_id ?? null,
...(message.task_id ? { taskId: message.task_id } : {}),
},
};
}
if (message.type === "tool_use_summary") {
return {
summary: "Tool use summary emitted.",
data: {
summary: truncate(message.summary),
precedingToolUseIds: sanitizeJsonValue(message.preceding_tool_use_ids),
},
};
}
if (message.type === "stream_event") {
const data: JsonObject = {};
const eventType = readString((raw.event as Record<string, unknown> | undefined)?.type);
if (eventType) {
data.eventType = eventType;
}
const parentToolUseId = readString(raw.parent_tool_use_id);
if (parentToolUseId) {
data.parentToolUseId = parentToolUseId;
}
return {
summary: "Partial assistant stream event emitted.",
...(Object.keys(data).length > 0 ? { data } : {}),
};
}
if (message.type === "auth_status") {
return {
summary: message.isAuthenticating ? "Authentication in progress." : "Authentication status update.",
data: {
isAuthenticating: message.isAuthenticating,
output: sanitizeJsonValue(message.output),
...(message.error ? { error: truncate(message.error) } : {}),
},
};
}
if (message.type === "assistant") {
return {
summary: "Assistant message emitted.",
data: {
parentToolUseId: message.parent_tool_use_id ?? null,
...(message.error ? { error: message.error } : {}),
},
};
}
if (message.type === "user") {
const data: JsonObject = {
parentToolUseId: (message as { parent_tool_use_id?: string | null }).parent_tool_use_id ?? null,
};
const isSynthetic = readBoolean(raw.isSynthetic);
if (isSynthetic !== undefined) {
data.isSynthetic = isSynthetic;
}
const isReplay = readBoolean(raw.isReplay);
if (isReplay !== undefined) {
data.isReplay = isReplay;
}
return {
summary: "User message emitted.",
data,
};
}
if (subtype === "task_notification") {
return toTaskNotificationSummary(message);
}
if (subtype === "task_started") {
return toTaskStartedSummary(message);
}
if (message.type === "system" && subtype === "files_persisted") {
const files = Array.isArray(raw.files) ? raw.files : [];
const failed = Array.isArray(raw.failed) ? raw.failed : [];
return {
summary: "System event: files_persisted.",
data: {
persistedFileCount: files.length,
failedFileCount: failed.length,
},
};
}
if (message.type === "system" && subtype === "compact_boundary") {
return {
summary: "System event: compact_boundary.",
data: {
compactMetadata: sanitizeJsonValue(raw.compact_metadata),
},
};
}
if (message.type === "system" && subtype === "status") {
const data: JsonObject = {
status: readString(raw.status) ?? "none",
};
const permissionMode = readString(raw.permissionMode);
if (permissionMode) {
data.permissionMode = permissionMode;
}
return {
summary: "System event: status.",
data,
};
}
if (message.type === "system" && (subtype === "hook_started" || subtype === "hook_progress" || subtype === "hook_response")) {
const data: JsonObject = {
...(subtype ? { subtype } : {}),
...(readString(raw.hook_id) ? { hookId: readString(raw.hook_id) } : {}),
...(readString(raw.hook_name) ? { hookName: readString(raw.hook_name) } : {}),
...(readString(raw.hook_event) ? { hookEvent: readString(raw.hook_event) } : {}),
...(readString(raw.outcome) ? { outcome: readString(raw.outcome) } : {}),
};
if (raw.exit_code !== undefined) {
data.exitCode = sanitizeJsonValue(raw.exit_code);
}
return {
summary: `System event: ${subtype}.`,
data,
};
}
if (message.type === "system") {
return {
summary: subtype ? `System event: ${subtype}.` : "System event emitted.",
data: subtype ? { subtype } : undefined,
};
}
if (message.type === "rate_limit") {
return {
summary: "Rate limit event emitted.",
data: sanitizeJsonValue(raw) as JsonObject,
};
}
if (message.type === "prompt_suggestion") {
const data: JsonObject = {
...(readString(raw.prompt) ? { prompt: truncate(readString(raw.prompt) as string) } : {}),
...(readString(raw.suggestion) ? { suggestion: truncate(readString(raw.suggestion) as string) } : {}),
};
return {
summary: "Prompt suggestion emitted.",
...(Object.keys(data).length > 0 ? { data } : {}),
};
}
return {
summary: `Claude SDK message received (${message.type}).`,
};
}
function toRecord(input: {
stage: ClaudeTraceRecord["stage"];
message: string;
context: ClaudeTraceContext;
sdkMessageType?: string;
sdkMessageSubtype?: string;
sdkSessionId?: string;
data?: JsonObject;
}): ClaudeTraceRecord {
return {
id: randomUUID(),
timestamp: new Date().toISOString(),
source: "claude_sdk",
stage: input.stage,
message: input.message,
sessionId: input.context.sessionId,
nodeId: input.context.nodeId,
attempt: input.context.attempt,
depth: input.context.depth,
...(input.sdkMessageType ? { sdkMessageType: input.sdkMessageType } : {}),
...(input.sdkMessageSubtype ? { sdkMessageSubtype: input.sdkMessageSubtype } : {}),
...(input.sdkSessionId ? { sdkSessionId: input.sdkSessionId } : {}),
...(input.data ? { data: input.data } : {}),
};
}
export function summarizeClaudeMessage(
message: SDKMessage,
verbosity: ClaudeObservabilityVerbosity,
): {
messageType: string;
messageSubtype?: string;
sdkSessionId?: string;
summary: string;
data?: JsonObject;
} {
const messageSubtype = toMessageSubtype(message);
const sdkSessionId = toMessageSessionId(message);
const summary = toMessageSummary(message);
if (verbosity === "full") {
return {
messageType: message.type,
...(messageSubtype ? { messageSubtype } : {}),
...(sdkSessionId ? { sdkSessionId } : {}),
summary: summary.summary,
data: {
message: sanitizeJsonValue(message) as JsonObject,
},
};
}
return {
messageType: message.type,
...(messageSubtype ? { messageSubtype } : {}),
...(sdkSessionId ? { sdkSessionId } : {}),
summary: summary.summary,
...(summary.data ? { data: summary.data } : {}),
};
}
export class ClaudeObservabilityLogger {
private readonly mode: ClaudeObservabilityMode;
private readonly verbosity: ClaudeObservabilityVerbosity;
private readonly logPath: string;
private readonly includePartialMessages: boolean;
private readonly debug: boolean;
private readonly debugLogPath?: string;
private readonly pendingWrites = new Set<Promise<void>>();
private readonly stdoutProgressByKey = new Map<string, {
lastEmittedAt: number;
suppressed: number;
}>();
private readonly fileProgressByKey = new Map<string, {
lastEmittedAt: number;
suppressed: number;
}>();
private readonly stdoutStreamByKey = new Map<string, {
lastEmittedAt: number;
suppressed: number;
}>();
private readonly fileStreamByKey = new Map<string, {
lastEmittedAt: number;
suppressed: number;
}>();
private fileWriteFailureCount = 0;
constructor(input: {
workspaceRoot: string;
config: ClaudeObservabilityRuntimeConfig;
}) {
this.mode = input.config.mode;
this.verbosity = input.config.verbosity;
this.logPath = resolve(input.workspaceRoot, input.config.logPath);
this.includePartialMessages = input.config.includePartialMessages;
this.debug = input.config.debug;
this.debugLogPath = input.config.debugLogPath
? resolve(input.workspaceRoot, input.config.debugLogPath)
: undefined;
}
isEnabled(): boolean {
return this.mode !== "off";
}
toOptionOverrides(input: {
context: ClaudeTraceContext;
}): Pick<Options, "includePartialMessages" | "debug" | "debugFile" | "stderr"> {
return {
includePartialMessages: this.includePartialMessages,
debug: this.debug || this.debugLogPath !== undefined,
...(this.debugLogPath ? { debugFile: this.debugLogPath } : {}),
stderr: (data: string): void => {
this.record({
stage: "query.stderr",
message: "Claude SDK stderr output.",
context: input.context,
data: {
stderr: sanitizeJsonValue(data),
},
});
},
};
}
recordQueryStarted(input: {
context: ClaudeTraceContext;
data?: JsonObject;
}): void {
this.record({
stage: "query.started",
message: "Claude query started.",
context: input.context,
...(input.data ? { data: input.data } : {}),
});
}
recordMessage(input: {
context: ClaudeTraceContext;
message: SDKMessage;
}): void {
const summarized = summarizeClaudeMessage(input.message, this.verbosity);
this.record({
stage: "query.message",
message: summarized.summary,
context: input.context,
sdkMessageType: summarized.messageType,
sdkMessageSubtype: summarized.messageSubtype,
sdkSessionId: summarized.sdkSessionId,
...(summarized.data ? { data: summarized.data } : {}),
});
}
recordQueryCompleted(input: {
context: ClaudeTraceContext;
data?: JsonObject;
}): void {
this.record({
stage: "query.completed",
message: "Claude query completed.",
context: input.context,
...(input.data ? { data: input.data } : {}),
});
}
recordQueryError(input: {
context: ClaudeTraceContext;
error: unknown;
}): void {
const errorMessage = input.error instanceof Error ? input.error.message : String(input.error);
this.record({
stage: "query.error",
message: "Claude query failed.",
context: input.context,
data: {
error: truncate(errorMessage),
},
});
}
async close(): Promise<void> {
await Promise.all([...this.pendingWrites]);
}
private record(input: {
stage: ClaudeTraceRecord["stage"];
message: string;
context: ClaudeTraceContext;
sdkMessageType?: string;
sdkMessageSubtype?: string;
sdkSessionId?: string;
data?: JsonObject;
}): void {
if (!this.isEnabled()) {
return;
}
const record = toRecord(input);
if (this.mode === "stdout" || this.mode === "both") {
const stdoutRecord = this.toStdoutRecord(record);
if (stdoutRecord) {
console.log(`[claude-trace] ${JSON.stringify(stdoutRecord)}`);
}
}
if (this.mode === "file" || this.mode === "both") {
const fileRecord = this.toFileRecord(record);
if (!fileRecord) {
return;
}
const line = JSON.stringify(fileRecord);
const write = mkdir(dirname(this.logPath), { recursive: true })
.then(() => appendFile(this.logPath, `${line}\n`, "utf8"))
.catch((error: unknown) => {
this.reportFileWriteFailure(error);
})
.finally(() => {
this.pendingWrites.delete(write);
});
this.pendingWrites.add(write);
}
}
private toStdoutRecord(record: ClaudeTraceRecord): ClaudeTraceRecord | undefined {
return this.toFilteredMessageRecord(record, "stdout");
}
private toFileRecord(record: ClaudeTraceRecord): ClaudeTraceRecord | undefined {
return this.toFilteredMessageRecord(record, "file");
}
private toFilteredMessageRecord(
record: ClaudeTraceRecord,
destination: "stdout" | "file",
): ClaudeTraceRecord | undefined {
if (record.stage !== "query.message") {
return record;
}
if (!record.sdkMessageType) {
return record;
}
if (record.sdkMessageType === "tool_progress") {
return this.toSampledToolProgressRecord(record, destination);
}
if (record.sdkMessageType === "stream_event") {
if (!this.includePartialMessages) {
return undefined;
}
return this.toSampledStreamEventRecord(record, destination);
}
if (record.sdkMessageType === "auth_status") {
const data = record.data;
const isAuthenticating = data?.isAuthenticating === true;
const hasError = typeof data?.error === "string" && data.error.trim().length > 0;
if (hasError || !isAuthenticating) {
return record;
}
return undefined;
}
return record;
}
private toSampledToolProgressRecord(
record: ClaudeTraceRecord,
destination: "stdout" | "file",
): ClaudeTraceRecord | undefined {
const now = Date.now();
const minIntervalMs = destination === "stdout" ? 1000 : 2000;
const rawToolUseId = record.data?.toolUseId;
const toolUseId = typeof rawToolUseId === "string" ? rawToolUseId : "unknown";
const key = `${destination}:${record.sessionId}:${record.nodeId}:${toolUseId}`;
const progressByKey = destination === "stdout" ? this.stdoutProgressByKey : this.fileProgressByKey;
const state = progressByKey.get(key);
if (!state) {
progressByKey.set(key, {
lastEmittedAt: now,
suppressed: 0,
});
return record;
}
if (now - state.lastEmittedAt < minIntervalMs) {
state.suppressed += 1;
return undefined;
}
state.lastEmittedAt = now;
const suppressed = state.suppressed;
state.suppressed = 0;
if (suppressed < 1) {
return record;
}
const nextData: JsonObject = {
...(record.data ?? {}),
suppressedSinceLastEmit: suppressed,
};
return {
...record,
data: nextData,
};
}
private toSampledStreamEventRecord(
record: ClaudeTraceRecord,
destination: "stdout" | "file",
): ClaudeTraceRecord | undefined {
const now = Date.now();
const minIntervalMs = destination === "stdout" ? 700 : 1200;
const key = `${destination}:${record.sessionId}:${record.nodeId}:stream`;
const streamByKey = destination === "stdout" ? this.stdoutStreamByKey : this.fileStreamByKey;
const state = streamByKey.get(key);
if (!state) {
streamByKey.set(key, {
lastEmittedAt: now,
suppressed: 0,
});
return record;
}
if (now - state.lastEmittedAt < minIntervalMs) {
state.suppressed += 1;
return undefined;
}
state.lastEmittedAt = now;
const suppressed = state.suppressed;
state.suppressed = 0;
if (suppressed < 1) {
return record;
}
const nextData: JsonObject = {
...(record.data ?? {}),
suppressedStreamEventsSinceLastEmit: suppressed,
};
return {
...record,
data: nextData,
};
}
private reportFileWriteFailure(error: unknown): void {
this.fileWriteFailureCount += 1;
if (this.fileWriteFailureCount <= 5) {
const message = error instanceof Error ? error.message : String(error);
console.warn(
`[claude-trace] failed to append trace log to ${this.logPath}: ${truncate(message, 180)}`,
);
return;
}
if (this.fileWriteFailureCount === 6) {
console.warn("[claude-trace] additional trace-log write failures suppressed.");
}
}
}

View File

@@ -0,0 +1,85 @@
import { readFile } from "node:fs/promises";
import { resolve } from "node:path";
export type ClaudeTraceEvent = {
timestamp: string;
message: string;
stage?: string;
sessionId?: string;
sdkMessageType?: string;
sdkMessageSubtype?: string;
data?: unknown;
} & Record<string, unknown>;
type ClaudeTraceFilter = {
sessionId?: string;
limit?: number;
};
function safeParseLine(line: string): ClaudeTraceEvent | undefined {
const trimmed = line.trim();
if (!trimmed) {
return undefined;
}
try {
const parsed = JSON.parse(trimmed) as unknown;
if (!parsed || typeof parsed !== "object") {
return undefined;
}
const record = parsed as Record<string, unknown>;
if (typeof record.timestamp !== "string" || typeof record.message !== "string") {
return undefined;
}
return record as ClaudeTraceEvent;
} catch {
return undefined;
}
}
export async function readClaudeTraceEvents(logPath: string): Promise<ClaudeTraceEvent[]> {
const absolutePath = resolve(logPath);
let content = "";
try {
content = await readFile(absolutePath, "utf8");
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return [];
}
throw error;
}
const parsed: ClaudeTraceEvent[] = [];
for (const line of content.split(/\r?\n/)) {
const event = safeParseLine(line);
if (event) {
parsed.push(event);
}
}
parsed.sort((left, right) => left.timestamp.localeCompare(right.timestamp));
return parsed;
}
export function filterClaudeTraceEvents(
events: readonly ClaudeTraceEvent[],
filter: ClaudeTraceFilter,
): ClaudeTraceEvent[] {
const filtered: ClaudeTraceEvent[] = [];
for (const event of events) {
if (filter.sessionId && event.sessionId !== filter.sessionId) {
continue;
}
filtered.push(event);
}
if (!filter.limit || filter.limit < 1 || filtered.length <= filter.limit) {
return filtered;
}
return filtered.slice(-filter.limit);
}

View File

@@ -23,6 +23,7 @@ export type LimitSettings = {
topologyMaxDepth: number;
topologyMaxRetries: number;
relationshipMaxChildren: number;
mergeConflictMaxAttempts: number;
portBase: number;
portBlockSize: number;
portBlockCount: number;
@@ -38,6 +39,7 @@ export type UiConfigSnapshot = {
stateRoot: string;
projectContextPath: string;
runtimeEventLogPath: string;
claudeTraceLogPath: string;
securityAuditLogPath: string;
};
};
@@ -88,6 +90,7 @@ function toLimits(config: Readonly<AppConfig>): LimitSettings {
topologyMaxDepth: config.orchestration.maxDepth,
topologyMaxRetries: config.orchestration.maxRetries,
relationshipMaxChildren: config.orchestration.maxChildren,
mergeConflictMaxAttempts: config.orchestration.mergeConflictMaxAttempts,
portBase: config.provisioning.portRange.basePort,
portBlockSize: config.provisioning.portRange.blockSize,
portBlockCount: config.provisioning.portRange.blockCount,
@@ -105,6 +108,7 @@ function toSnapshot(config: Readonly<AppConfig>, envFilePath: string): UiConfigS
stateRoot: config.orchestration.stateRoot,
projectContextPath: config.orchestration.projectContextPath,
runtimeEventLogPath: config.runtimeEvents.logPath,
claudeTraceLogPath: config.provider.claudeObservability.logPath,
securityAuditLogPath: config.security.auditLogPath,
},
};
@@ -170,6 +174,7 @@ export class UiConfigStore {
AGENT_TOPOLOGY_MAX_DEPTH: String(input.topologyMaxDepth),
AGENT_TOPOLOGY_MAX_RETRIES: String(input.topologyMaxRetries),
AGENT_RELATIONSHIP_MAX_CHILDREN: String(input.relationshipMaxChildren),
AGENT_MERGE_CONFLICT_MAX_ATTEMPTS: String(input.mergeConflictMaxAttempts),
AGENT_PORT_BASE: String(input.portBase),
AGENT_PORT_BLOCK_SIZE: String(input.portBlockSize),
AGENT_PORT_BLOCK_COUNT: String(input.portBlockCount),

View File

@@ -10,6 +10,7 @@ import { isDomainEventType, type DomainEventEmission } from "../agents/domain-ev
import type { ActorExecutionInput, ActorExecutionResult, ActorExecutor } from "../agents/pipeline.js";
import { isRecord, type JsonObject, type JsonValue } from "../agents/types.js";
import { createSessionContext, type SessionContext } from "../examples/session-context.js";
import { ClaudeObservabilityLogger } from "./claude-observability.js";
export type RunProvider = "codex" | "claude";
@@ -17,6 +18,7 @@ export type ProviderRunRuntime = {
provider: RunProvider;
config: Readonly<AppConfig>;
sessionContext: SessionContext;
claudeObservability: ClaudeObservabilityLogger;
close: () => Promise<void>;
};
@@ -333,7 +335,7 @@ function buildActorPrompt(input: ActorExecutionInput): string {
},
events: [
{
type: "requirements_defined | tasks_planned | code_committed | task_blocked | validation_passed | validation_failed | branch_merged",
type: "requirements_defined | tasks_planned | code_committed | task_ready_for_review | task_blocked | validation_passed | validation_failed | branch_merged | merge_conflict_detected | merge_conflict_resolved | merge_conflict_unresolved | merge_retry_started",
payload: {
summary: "optional",
details: {},
@@ -416,6 +418,40 @@ type ClaudeTurnResult = {
usage: ProviderUsage;
};
function toClaudeTraceContext(actorInput: ActorExecutionInput): {
sessionId: string;
nodeId: string;
attempt: number;
depth: number;
} {
return {
sessionId: actorInput.sessionId,
nodeId: actorInput.node.id,
attempt: actorInput.attempt,
depth: actorInput.depth,
};
}
function toProviderUsageJson(usage: ProviderUsage): JsonObject {
const output: JsonObject = {};
if (typeof usage.tokenInput === "number") {
output.tokenInput = usage.tokenInput;
}
if (typeof usage.tokenOutput === "number") {
output.tokenOutput = usage.tokenOutput;
}
if (typeof usage.tokenTotal === "number") {
output.tokenTotal = usage.tokenTotal;
}
if (typeof usage.durationMs === "number") {
output.durationMs = usage.durationMs;
}
if (typeof usage.costUsd === "number") {
output.costUsd = usage.costUsd;
}
return output;
}
function buildClaudeOptions(input: {
runtime: ProviderRunRuntime;
actorInput: ActorExecutionInput;
@@ -433,6 +469,7 @@ function buildClaudeOptions(input: {
...runtime.sessionContext.runtimeInjection.env,
...buildClaudeAuthEnv(runtime.config.provider),
};
const traceContext = toClaudeTraceContext(actorInput);
return {
maxTurns: CLAUDE_PROVIDER_MAX_TURNS,
@@ -449,6 +486,9 @@ function buildClaudeOptions(input: {
canUseTool: actorInput.mcp.createClaudeCanUseTool(),
cwd: runtime.sessionContext.runtimeInjection.workingDirectory,
env: runtimeEnv,
...runtime.claudeObservability.toOptionOverrides({
context: traceContext,
}),
outputFormat: CLAUDE_OUTPUT_FORMAT,
};
}
@@ -458,10 +498,19 @@ async function runClaudeTurn(input: {
actorInput: ActorExecutionInput;
prompt: string;
}): Promise<ClaudeTurnResult> {
const traceContext = toClaudeTraceContext(input.actorInput);
const options = buildClaudeOptions({
runtime: input.runtime,
actorInput: input.actorInput,
});
input.runtime.claudeObservability.recordQueryStarted({
context: traceContext,
data: {
...(options.model ? { model: options.model } : {}),
maxTurns: options.maxTurns ?? CLAUDE_PROVIDER_MAX_TURNS,
cwd: input.runtime.sessionContext.runtimeInjection.workingDirectory,
},
});
const startedAt = Date.now();
const stream = query({
@@ -472,6 +521,7 @@ async function runClaudeTurn(input: {
let resultText = "";
let structuredOutput: unknown;
let usage: ProviderUsage = {};
let messageCount = 0;
const onAbort = (): void => {
stream.close();
@@ -481,6 +531,12 @@ async function runClaudeTurn(input: {
try {
for await (const message of stream as AsyncIterable<SDKMessage>) {
messageCount += 1;
input.runtime.claudeObservability.recordMessage({
context: traceContext,
message,
});
if (message.type !== "result") {
continue;
}
@@ -502,6 +558,12 @@ async function runClaudeTurn(input: {
costUsd: message.total_cost_usd,
};
}
} catch (error) {
input.runtime.claudeObservability.recordQueryError({
context: traceContext,
error,
});
throw error;
} finally {
input.actorInput.signal.removeEventListener("abort", onAbort);
stream.close();
@@ -512,9 +574,22 @@ async function runClaudeTurn(input: {
}
if (!resultText) {
throw new Error("Claude run completed without a final result.");
const error = new Error("Claude run completed without a final result.");
input.runtime.claudeObservability.recordQueryError({
context: traceContext,
error,
});
throw error;
}
input.runtime.claudeObservability.recordQueryCompleted({
context: traceContext,
data: {
messageCount,
usage: toProviderUsageJson(usage),
},
});
return {
text: resultText,
structuredOutput,
@@ -553,18 +628,30 @@ export async function createProviderRunRuntime(input: {
provider: RunProvider;
initialPrompt: string;
config: Readonly<AppConfig>;
projectPath: string;
observabilityRootPath?: string;
}): Promise<ProviderRunRuntime> {
const sessionContext = await createSessionContext(input.provider, {
prompt: input.initialPrompt,
config: input.config,
workspaceRoot: input.projectPath,
});
const claudeObservability = new ClaudeObservabilityLogger({
workspaceRoot: input.observabilityRootPath ?? input.projectPath,
config: input.config.provider.claudeObservability,
});
return {
provider: input.provider,
config: input.config,
sessionContext,
claudeObservability,
close: async () => {
await sessionContext.close();
try {
await sessionContext.close();
} finally {
await claudeObservability.close();
}
},
};
}

View File

@@ -2,6 +2,7 @@ const state = {
config: null,
manifests: [],
sessions: [],
sessionMetadata: [],
runs: [],
selectedSessionId: "",
selectedManifestPath: "",
@@ -25,13 +26,22 @@ const dom = {
runProvider: document.querySelector("#run-provider"),
runTopologyHint: document.querySelector("#run-topology-hint"),
runFlags: document.querySelector("#run-flags"),
runRuntimeContext: document.querySelector("#run-runtime-context"),
runValidationNodes: document.querySelector("#run-validation-nodes"),
killRun: document.querySelector("#kill-run"),
runStatus: document.querySelector("#run-status"),
sessionForm: document.querySelector("#session-form"),
sessionProjectPath: document.querySelector("#session-project-path"),
sessionCreate: document.querySelector("#session-create"),
sessionClose: document.querySelector("#session-close"),
sessionCloseMerge: document.querySelector("#session-close-merge"),
nodeInspector: document.querySelector("#node-inspector"),
eventsLimit: document.querySelector("#events-limit"),
eventsRefresh: document.querySelector("#events-refresh"),
eventFeed: document.querySelector("#event-feed"),
claudeEventsLimit: document.querySelector("#claude-events-limit"),
claudeEventsRefresh: document.querySelector("#claude-events-refresh"),
claudeEventFeed: document.querySelector("#claude-event-feed"),
historyRefresh: document.querySelector("#history-refresh"),
historyBody: document.querySelector("#history-body"),
notificationsForm: document.querySelector("#notifications-form"),
@@ -77,6 +87,7 @@ const dom = {
cfgTopologyDepth: document.querySelector("#cfg-topology-depth"),
cfgTopologyRetries: document.querySelector("#cfg-topology-retries"),
cfgRelationshipChildren: document.querySelector("#cfg-relationship-children"),
cfgMergeConflictAttempts: document.querySelector("#cfg-merge-conflict-attempts"),
cfgPortBase: document.querySelector("#cfg-port-base"),
cfgPortBlockSize: document.querySelector("#cfg-port-block-size"),
cfgPortBlockCount: document.querySelector("#cfg-port-block-count"),
@@ -111,15 +122,120 @@ const MANIFEST_EVENT_TRIGGERS = [
"requirements_defined",
"tasks_planned",
"code_committed",
"task_ready_for_review",
"task_blocked",
"validation_passed",
"validation_failed",
"branch_merged",
"merge_conflict_detected",
"merge_conflict_resolved",
"merge_conflict_unresolved",
"merge_retry_started",
];
const RUN_MANIFEST_EDITOR_VALUE = "__editor__";
const RUN_MANIFEST_EDITOR_LABEL = "[Use Manifest Editor JSON]";
const LABEL_HELP_BY_CONTROL = Object.freeze({
"session-select": "Select which session the graph and feed should focus on.",
"graph-manifest-select": "Choose the manifest context used when rendering the selected session graph.",
"run-prompt": "Describe the task objective you want the run to complete.",
"run-manifest-select": "Choose a saved manifest or use the JSON currently in the editor.",
"run-execution-mode": "Use provider for live model execution or mock for simulated execution.",
"run-provider": "Choose which model provider backend handles provider-mode runs.",
"run-topology-hint": "Optional hint that nudges orchestration toward a topology strategy.",
"run-flags": "Optional JSON object passed in as initial run flags.",
"run-runtime-context": "Optional JSON object of template values injected into persona prompts (for example repo or ticket).",
"run-validation-nodes": "Optional comma-separated node IDs to simulate validation outcomes for.",
"session-project-path": "Absolute project path used when creating an explicit managed session.",
"session-close-merge": "When enabled, close will merge the session base branch back into the project branch.",
"events-limit": "Set how many recent runtime events are loaded per refresh.",
"claude-events-limit": "Set how many Claude SDK trace records are loaded per refresh.",
"cfg-webhook-url": "Webhook endpoint that receives runtime event notifications.",
"cfg-webhook-severity": "Minimum severity level that triggers webhook notifications.",
"cfg-webhook-always": "Event types that should always notify, regardless of severity.",
"cfg-security-mode": "Policy behavior used when a command violates security rules.",
"cfg-security-binaries": "Comma-separated command binaries permitted by policy.",
"cfg-security-timeout": "Maximum command execution time before forced timeout.",
"cfg-security-inherit": "Environment variable names to pass through to subprocesses.",
"cfg-security-scrub": "Environment variable names to strip before command execution.",
"cfg-limit-concurrent": "Maximum number of agents that can run concurrently across sessions.",
"cfg-limit-session": "Maximum number of agents that can run concurrently within a single session.",
"cfg-limit-depth": "Maximum recursive spawn depth allowed for agent tasks.",
"cfg-topology-depth": "Maximum orchestration graph depth permitted by topology rules.",
"cfg-topology-retries": "Maximum retry expansions allowed by topology orchestration.",
"cfg-relationship-children": "Maximum children each persona relationship can spawn.",
"cfg-merge-conflict-attempts": "Maximum merge-conflict resolution attempts before emitting unresolved conflict events.",
"cfg-port-base": "Starting port number for provisioning port allocations.",
"cfg-port-block-size": "Number of ports reserved per allocated block.",
"cfg-port-block-count": "Number of port blocks available for allocation.",
"cfg-port-primary-offset": "Offset within each block used for the primary service port.",
"manifest-path": "Workspace-relative manifest file path to load, validate, or save.",
"helper-topology-sequential": "Allow sequential execution topology in this manifest.",
"helper-topology-parallel": "Allow parallel execution topology in this manifest.",
"helper-topology-hierarchical": "Allow hierarchical parent-child execution topology.",
"helper-topology-retry-unrolled": "Allow retry-unrolled topology for explicit retry paths.",
"helper-topology-max-depth": "Top-level cap on orchestration depth in this manifest.",
"helper-topology-max-retries": "Top-level cap on retry attempts in this manifest.",
"helper-entry-node-id": "Node ID used as the pipeline entry point.",
"helper-persona-id": "Stable persona identifier referenced by nodes and relationships.",
"helper-persona-display-name": "Human-readable persona name shown in summaries and tooling.",
"helper-persona-model-constraint": "Optional model restriction for this persona only.",
"helper-persona-system-prompt": "Base prompt template that defines persona behavior.",
"helper-persona-allowlist": "Comma-separated tool names this persona may use.",
"helper-persona-banlist": "Comma-separated tool names this persona must not use.",
"helper-relationship-parent": "Parent persona ID that can spawn or delegate to the child.",
"helper-relationship-child": "Child persona ID allowed under the selected parent.",
"helper-relationship-max-depth": "Optional override limiting recursion depth for this relationship.",
"helper-relationship-max-children": "Optional override limiting child fan-out for this relationship.",
"helper-node-id": "Unique pipeline node identifier used by entry node and edges.",
"helper-node-actor-id": "Runtime actor identifier assigned to this node.",
"helper-node-persona-id": "Persona applied when this node executes.",
"helper-node-topology-kind": "Optional node-level topology override.",
"helper-node-block-id": "Optional topology block identifier for grouped scheduling logic.",
"helper-node-max-retries": "Optional node-level retry limit override.",
"helper-edge-from": "Source node where this edge starts.",
"helper-edge-to": "Target node activated when this edge condition matches.",
"helper-edge-trigger-kind": "Choose whether edge activation is status-based or event-based.",
"helper-edge-on": "Node status value that triggers this edge when using status mode.",
"helper-edge-event": "Domain event name that triggers this edge when using event mode.",
"helper-edge-when": "Optional JSON array of additional conditions required to follow the edge.",
});
function extractLabelText(label) {
const clone = label.cloneNode(true);
for (const field of clone.querySelectorAll("input, select, textarea")) {
field.remove();
}
return clone.textContent?.replace(/\s+/g, " ").trim() || "this field";
}
function applyLabelTooltips(root = document) {
for (const label of root.querySelectorAll("label")) {
const control = label.querySelector("input,select,textarea");
if (!control) {
continue;
}
let help = LABEL_HELP_BY_CONTROL[control.id] || "";
if (!help) {
for (const className of control.classList) {
help = LABEL_HELP_BY_CONTROL[className] || "";
if (help) {
break;
}
}
}
if (!help) {
const labelText = extractLabelText(label).toLowerCase();
help = `Set ${labelText} for this configuration.`;
}
label.title = help;
control.title = help;
}
}
function fmtMoney(value) {
return `$${Number(value || 0).toFixed(4)}`;
}
@@ -575,6 +691,8 @@ function renderManifestHelper() {
`;
})
.join("");
applyLabelTooltips(dom.manifestForm);
}
function readManifestDraftFromHelper() {
@@ -932,6 +1050,7 @@ async function loadConfig() {
dom.cfgTopologyDepth.value = String(limits.topologyMaxDepth);
dom.cfgTopologyRetries.value = String(limits.topologyMaxRetries);
dom.cfgRelationshipChildren.value = String(limits.relationshipMaxChildren);
dom.cfgMergeConflictAttempts.value = String(limits.mergeConflictMaxAttempts);
dom.cfgPortBase.value = String(limits.portBase);
dom.cfgPortBlockSize.value = String(limits.portBlockSize);
dom.cfgPortBlockCount.value = String(limits.portBlockCount);
@@ -963,11 +1082,28 @@ function statusChipClass(status) {
return `status-chip status-${status || "unknown"}`;
}
function getSessionLifecycleStatus(sessionId) {
const metadata = state.sessionMetadata.find((entry) => entry?.sessionId === sessionId);
if (!metadata) {
return undefined;
}
const status = metadata.sessionStatus;
if (status === "active" || status === "suspended" || status === "closed" || status === "closed_with_conflicts") {
return status;
}
return undefined;
}
function renderRunsAndSessionsTable() {
const rows = [];
for (const session of state.sessions) {
const sessionStatus = session.status || "unknown";
const lifecycleStatus = getSessionLifecycleStatus(session.sessionId);
const sessionStatus =
lifecycleStatus === "closed" || lifecycleStatus === "closed_with_conflicts"
? lifecycleStatus
: session.status || lifecycleStatus || "unknown";
rows.push(`
<tr data-session-id="${escapeHtml(session.sessionId)}">
<td>${escapeHtml(session.sessionId)}</td>
@@ -995,6 +1131,7 @@ function renderRunsAndSessionsTable() {
async function loadSessions() {
const payload = await apiRequest("/api/sessions");
state.sessions = payload.sessions || [];
state.sessionMetadata = payload.sessionMetadata || [];
state.runs = payload.runs || [];
if (!state.selectedSessionId && state.sessions.length > 0) {
@@ -1360,6 +1497,43 @@ function renderEventFeed(events) {
dom.eventFeed.innerHTML = rows || '<div class="event-row"><div class="event-time">-</div><div class="event-type">-</div><div>No runtime events.</div></div>';
}
function toClaudeRowSeverity(event) {
const stage = String(event?.stage || "");
const type = String(event?.sdkMessageType || "");
if (stage === "query.error") {
return "critical";
}
if (stage === "query.stderr" || (type === "result" && String(event?.sdkMessageSubtype || "").startsWith("error_"))) {
return "warning";
}
return "info";
}
function renderClaudeTraceFeed(events) {
const rows = [...events]
.reverse()
.map((event) => {
const ts = new Date(event.timestamp).toLocaleTimeString();
const stage = String(event.stage || "query.message");
const sdkMessageType = String(event.sdkMessageType || "");
const sdkMessageSubtype = String(event.sdkMessageSubtype || "");
const typeLabel = sdkMessageType
? `${stage}/${sdkMessageType}${sdkMessageSubtype ? `:${sdkMessageSubtype}` : ""}`
: stage;
const message = typeof event.message === "string" ? event.message : JSON.stringify(event.message || "");
return `
<div class="event-row ${escapeHtml(toClaudeRowSeverity(event))}">
<div class="event-time">${escapeHtml(ts)}</div>
<div class="event-type">${escapeHtml(typeLabel)}</div>
<div>${escapeHtml(message)}</div>
</div>
`;
})
.join("");
dom.claudeEventFeed.innerHTML = rows || '<div class="event-row"><div class="event-time">-</div><div class="event-type">-</div><div>No Claude trace events.</div></div>';
}
async function refreshEvents() {
const limit = Number(dom.eventsLimit.value || "150");
const params = new URLSearchParams({
@@ -1374,6 +1548,20 @@ async function refreshEvents() {
renderEventFeed(payload.events || []);
}
async function refreshClaudeTrace() {
const limit = Number(dom.claudeEventsLimit.value || "150");
const params = new URLSearchParams({
limit: String(limit),
});
if (state.selectedSessionId) {
params.set("sessionId", state.selectedSessionId);
}
const payload = await apiRequest(`/api/claude-trace?${params.toString()}`);
renderClaudeTraceFeed(payload.events || []);
}
async function startRun(event) {
event.preventDefault();
@@ -1389,6 +1577,12 @@ async function startRun(event) {
return;
}
const runtimeContext = parseJsonSafe(dom.runRuntimeContext.value, {});
if (typeof runtimeContext !== "object" || Array.isArray(runtimeContext) || !runtimeContext) {
showRunStatus("Runtime Context Overrides must be a JSON object.", true);
return;
}
const manifestSelection = dom.runManifestSelect.value.trim();
const payload = {
@@ -1397,9 +1591,21 @@ async function startRun(event) {
provider: dom.runProvider.value,
topologyHint: dom.runTopologyHint.value.trim() || undefined,
initialFlags: flags,
runtimeContextOverrides: runtimeContext,
simulateValidationNodeIds: fromCsv(dom.runValidationNodes.value),
};
const selectedSessionMetadata = state.sessionMetadata.find(
(entry) => entry?.sessionId === state.selectedSessionId,
);
if (
selectedSessionMetadata &&
(selectedSessionMetadata.sessionStatus === "active" ||
selectedSessionMetadata.sessionStatus === "suspended")
) {
payload.sessionId = selectedSessionMetadata.sessionId;
}
if (manifestSelection === RUN_MANIFEST_EDITOR_VALUE) {
const manifestFromEditor = parseJsonSafe(dom.manifestEditor.value, null);
if (!manifestFromEditor) {
@@ -1430,6 +1636,7 @@ async function startRun(event) {
dom.sessionSelect.value = run.sessionId;
await refreshGraph();
await refreshEvents();
await refreshClaudeTrace();
} catch (error) {
showRunStatus(error instanceof Error ? error.message : String(error), true);
}
@@ -1450,6 +1657,67 @@ async function cancelActiveRun() {
await loadSessions();
await refreshGraph();
await refreshEvents();
await refreshClaudeTrace();
} catch (error) {
showRunStatus(error instanceof Error ? error.message : String(error), true);
}
}
async function createSessionFromUi() {
const projectPath = dom.sessionProjectPath.value.trim();
if (!projectPath) {
showRunStatus("Project path is required to create a session.", true);
return;
}
try {
const payload = await apiRequest("/api/sessions", {
method: "POST",
body: JSON.stringify({
projectPath,
}),
});
const created = payload.session;
if (created?.sessionId) {
state.selectedSessionId = created.sessionId;
showRunStatus(`Session ${created.sessionId} created.`);
} else {
showRunStatus("Session created.");
}
await loadSessions();
if (state.selectedSessionId) {
dom.sessionSelect.value = state.selectedSessionId;
await refreshGraph();
await refreshEvents();
await refreshClaudeTrace();
}
} catch (error) {
showRunStatus(error instanceof Error ? error.message : String(error), true);
}
}
async function closeSelectedSessionFromUi() {
const sessionId = state.selectedSessionId || dom.sessionSelect.value;
if (!sessionId) {
showRunStatus("Select a session before closing.", true);
return;
}
try {
const payload = await apiRequest(`/api/sessions/${encodeURIComponent(sessionId)}/close`, {
method: "POST",
body: JSON.stringify({
mergeToProject: dom.sessionCloseMerge.checked,
}),
});
const nextStatus = payload?.session?.sessionStatus || "closed";
showRunStatus(`Session ${sessionId} closed with status ${nextStatus}.`);
await loadSessions();
await refreshGraph();
await refreshEvents();
await refreshClaudeTrace();
} catch (error) {
showRunStatus(error instanceof Error ? error.message : String(error), true);
}
@@ -1500,6 +1768,7 @@ async function saveLimits(event) {
topologyMaxDepth: Number(dom.cfgTopologyDepth.value),
topologyMaxRetries: Number(dom.cfgTopologyRetries.value),
relationshipMaxChildren: Number(dom.cfgRelationshipChildren.value),
mergeConflictMaxAttempts: Number(dom.cfgMergeConflictAttempts.value),
portBase: Number(dom.cfgPortBase.value),
portBlockSize: Number(dom.cfgPortBlockSize.value),
portBlockCount: Number(dom.cfgPortBlockCount.value),
@@ -1598,6 +1867,7 @@ function bindUiEvents() {
state.selectedSessionId = dom.sessionSelect.value;
await refreshGraph();
await refreshEvents();
await refreshClaudeTrace();
});
dom.graphManifestSelect.addEventListener("change", async () => {
@@ -1617,15 +1887,26 @@ function bindUiEvents() {
await refreshEvents();
});
dom.claudeEventsRefresh.addEventListener("click", async () => {
await refreshClaudeTrace();
});
dom.historyRefresh.addEventListener("click", async () => {
await loadSessions();
await refreshGraph();
await refreshClaudeTrace();
});
dom.runForm.addEventListener("submit", startRun);
dom.killRun.addEventListener("click", () => {
void cancelActiveRun();
});
dom.sessionCreate.addEventListener("click", () => {
void createSessionFromUi();
});
dom.sessionClose.addEventListener("click", () => {
void closeSelectedSessionFromUi();
});
dom.notificationsForm.addEventListener("submit", (event) => {
void saveNotifications(event);
@@ -1733,10 +2014,12 @@ async function refreshAll() {
await refreshGraph();
await refreshEvents();
await refreshClaudeTrace();
}
async function initialize() {
bindUiEvents();
applyLabelTooltips();
try {
await apiRequest("/api/health");
@@ -1762,6 +2045,10 @@ async function initialize() {
void refreshEvents();
}, 3000);
setInterval(() => {
void refreshClaudeTrace();
}, 3000);
setInterval(() => {
void refreshGraph();
}, 7000);

View File

@@ -75,6 +75,10 @@
Initial Flags (JSON)
<textarea id="run-flags" rows="3" placeholder='{"needs_bootstrap": true}'></textarea>
</label>
<label>
Runtime Context Overrides (JSON)
<textarea id="run-runtime-context" rows="3" placeholder='{"repo":"ai_ops","ticket":"AIOPS-123"}'></textarea>
</label>
<label>
Simulate Validation Nodes (CSV)
<input id="run-validation-nodes" type="text" placeholder="coder-1,qa-1" />
@@ -86,6 +90,23 @@
</form>
<div id="run-status" class="subtle"></div>
<div class="divider"></div>
<h3>Session Controls</h3>
<form id="session-form" class="stacked-form">
<label>
Project Path (absolute)
<input id="session-project-path" type="text" placeholder="/abs/path/to/project" />
</label>
<label class="inline-checkbox">
<input id="session-close-merge" type="checkbox" />
Merge base into project when closing selected session
</label>
<div class="inline-actions">
<button id="session-create" type="button">Create Session</button>
<button id="session-close" type="button">Close Selected Session</button>
</div>
</form>
<div class="divider"></div>
<h3>Node Inspector</h3>
<div id="node-inspector" class="inspector empty">Select a graph node.</div>
@@ -109,6 +130,24 @@
<div id="event-feed" class="event-feed"></div>
</section>
<section class="panel claude-panel">
<div class="panel-head">
<h2>Claude Trace</h2>
<div class="panel-actions">
<label>
Limit
<select id="claude-events-limit">
<option value="80">80</option>
<option value="150" selected>150</option>
<option value="300">300</option>
</select>
</label>
<button id="claude-events-refresh" type="button">Refresh</button>
</div>
</div>
<div id="claude-event-feed" class="event-feed claude-event-feed"></div>
</section>
<section class="panel history-panel">
<div class="panel-head">
<h2>Run History</h2>
@@ -192,6 +231,7 @@
<label>AGENT_TOPOLOGY_MAX_DEPTH<input id="cfg-topology-depth" type="number" min="1" /></label>
<label>AGENT_TOPOLOGY_MAX_RETRIES<input id="cfg-topology-retries" type="number" min="0" /></label>
<label>AGENT_RELATIONSHIP_MAX_CHILDREN<input id="cfg-relationship-children" type="number" min="1" /></label>
<label>AGENT_MERGE_CONFLICT_MAX_ATTEMPTS<input id="cfg-merge-conflict-attempts" type="number" min="1" /></label>
<label>AGENT_PORT_BASE<input id="cfg-port-base" type="number" min="1" /></label>
<label>AGENT_PORT_BLOCK_SIZE<input id="cfg-port-block-size" type="number" min="1" /></label>
<label>AGENT_PORT_BLOCK_COUNT<input id="cfg-port-block-count" type="number" min="1" /></label>

View File

@@ -79,7 +79,8 @@ p {
grid-template-columns: minmax(0, 2fr) minmax(280px, 1fr);
grid-template-areas:
"graph side"
"feed history"
"feed claude"
"history history"
"config config";
}
@@ -129,6 +130,10 @@ p {
grid-area: history;
}
.claude-panel {
grid-area: claude;
}
.config-panel {
grid-area: config;
}
@@ -142,6 +147,12 @@ label {
letter-spacing: 0.015em;
}
label.inline-checkbox {
flex-direction: row;
align-items: center;
gap: 0.45rem;
}
input,
select,
textarea,
@@ -308,6 +319,14 @@ button.danger {
color: var(--critical);
}
.claude-event-feed .event-row {
grid-template-columns: 110px 150px 1fr;
}
.claude-event-feed .event-type {
font-size: 0.7rem;
}
.history-table {
width: 100%;
border-collapse: collapse;
@@ -353,6 +372,22 @@ button.danger {
border-color: rgba(255, 201, 74, 0.6);
}
.status-active {
color: var(--accent-cool);
border-color: rgba(86, 195, 255, 0.6);
}
.status-suspended,
.status-closed_with_conflicts {
color: var(--warn);
border-color: rgba(255, 201, 74, 0.6);
}
.status-closed {
color: var(--muted);
border-color: rgba(155, 184, 207, 0.45);
}
.status-unknown {
color: var(--muted);
border-color: rgba(155, 184, 207, 0.45);
@@ -463,6 +498,7 @@ button.danger {
"graph"
"side"
"feed"
"claude"
"history"
"config";
}

View File

@@ -3,11 +3,17 @@ import { mkdir, readFile, writeFile } from "node:fs/promises";
import { resolve } from "node:path";
import { SchemaDrivenExecutionEngine } from "../agents/orchestration.js";
import { parseAgentManifest, type AgentManifest } from "../agents/manifest.js";
import { FileSystemProjectContextStore } from "../agents/project-context.js";
import type {
ActorExecutionResult,
ActorExecutor,
PipelineAggregateStatus,
} from "../agents/pipeline.js";
import {
FileSystemSessionMetadataStore,
SessionWorktreeManager,
type SessionMetadata,
} from "../agents/session-lifecycle.js";
import { loadConfig, type AppConfig } from "../config.js";
import { parseEnvFile } from "./env-store.js";
import {
@@ -240,6 +246,19 @@ async function loadRuntimeConfig(envPath: string): Promise<Readonly<AppConfig>>
});
}
function resolveRuntimePaths(input: {
workspaceRoot: string;
config: Readonly<AppConfig>;
}): {
stateRoot: string;
worktreeRoot: string;
} {
return {
stateRoot: resolve(input.workspaceRoot, input.config.orchestration.stateRoot),
worktreeRoot: resolve(input.workspaceRoot, input.config.provisioning.gitWorktree.rootDirectory),
};
}
async function writeRunMeta(input: {
stateRoot: string;
sessionId: string;
@@ -319,6 +338,102 @@ export class UiRunService {
this.envFilePath = resolve(this.workspaceRoot, input.envFilePath ?? ".env");
}
private async loadRuntime(): Promise<{
config: Readonly<AppConfig>;
stateRoot: string;
sessionStore: FileSystemSessionMetadataStore;
worktreeManager: SessionWorktreeManager;
}> {
const config = await loadRuntimeConfig(this.envFilePath);
const paths = resolveRuntimePaths({
workspaceRoot: this.workspaceRoot,
config,
});
return {
config,
stateRoot: paths.stateRoot,
sessionStore: new FileSystemSessionMetadataStore({
stateRoot: paths.stateRoot,
}),
worktreeManager: new SessionWorktreeManager({
worktreeRoot: paths.worktreeRoot,
baseRef: config.provisioning.gitWorktree.baseRef,
}),
};
}
async createSession(input: {
projectPath: string;
sessionId?: string;
}): Promise<SessionMetadata> {
const runtime = await this.loadRuntime();
const sessionId = input.sessionId?.trim() || toSessionId();
const baseWorkspacePath = runtime.worktreeManager.resolveBaseWorkspacePath(sessionId);
const session = await runtime.sessionStore.createSession({
sessionId,
projectPath: resolve(input.projectPath),
baseWorkspacePath,
});
await runtime.worktreeManager.initializeSessionBaseWorkspace({
sessionId: session.sessionId,
projectPath: session.projectPath,
baseWorkspacePath: session.baseWorkspacePath,
});
return session;
}
async listSessions(): Promise<SessionMetadata[]> {
const runtime = await this.loadRuntime();
return runtime.sessionStore.listSessions();
}
async readSession(sessionId: string): Promise<SessionMetadata | undefined> {
const runtime = await this.loadRuntime();
return runtime.sessionStore.readSession(sessionId);
}
async closeSession(input: {
sessionId: string;
mergeToProject?: boolean;
}): Promise<SessionMetadata> {
const runtime = await this.loadRuntime();
const session = await runtime.sessionStore.readSession(input.sessionId);
if (!session) {
throw new Error(`Session \"${input.sessionId}\" does not exist.`);
}
const sessionProjectContextStore = new FileSystemProjectContextStore({
filePath: runtime.sessionStore.getSessionProjectContextPath(session.sessionId),
});
const projectContext = await sessionProjectContextStore.readState();
const taskWorktreePaths = projectContext.taskQueue
.map((task) => task.worktreePath)
.filter((path): path is string => typeof path === "string" && path.trim().length > 0);
const outcome = await runtime.worktreeManager.closeSession({
session,
taskWorktreePaths,
mergeBaseIntoProject: input.mergeToProject === true,
});
if (outcome.kind === "fatal_error") {
throw new Error(`Session close failed: ${outcome.error}`);
}
if (outcome.kind === "conflict") {
return runtime.sessionStore.updateSession(session.sessionId, {
sessionStatus: "closed_with_conflicts",
});
}
return runtime.sessionStore.updateSession(session.sessionId, {
sessionStatus: "closed",
});
}
listRuns(): RunRecord[] {
const output = [...this.runHistory.values()].sort((left, right) => {
return right.startedAt.localeCompare(left.startedAt);
@@ -331,11 +446,24 @@ export class UiRunService {
}
async startRun(input: StartRunInput): Promise<RunRecord> {
const config = await loadRuntimeConfig(this.envFilePath);
const runtime = await this.loadRuntime();
const config = runtime.config;
const manifest = parseAgentManifest(input.manifest);
const executionMode = input.executionMode ?? "mock";
const provider = input.provider ?? "codex";
const sessionId = input.sessionId?.trim() || toSessionId();
const session = input.sessionId?.trim()
? await runtime.sessionStore.readSession(sessionId)
: undefined;
if (input.sessionId?.trim() && !session) {
throw new Error(`Session \"${sessionId}\" does not exist.`);
}
if (
session &&
(session.sessionStatus === "closed" || session.sessionStatus === "closed_with_conflicts")
) {
throw new Error(`Session \"${sessionId}\" is closed and cannot run new tasks.`);
}
const runId = randomUUID();
const controller = new AbortController();
@@ -359,6 +487,8 @@ export class UiRunService {
provider,
initialPrompt: input.prompt,
config,
projectPath: session?.baseWorkspacePath ?? this.workspaceRoot,
observabilityRootPath: this.workspaceRoot,
});
}
@@ -376,11 +506,20 @@ export class UiRunService {
actorExecutors,
settings: {
workspaceRoot: this.workspaceRoot,
stateRoot: config.orchestration.stateRoot,
projectContextPath: config.orchestration.projectContextPath,
stateRoot: runtime.stateRoot,
projectContextPath: session
? runtime.sessionStore.getSessionProjectContextPath(sessionId)
: resolve(this.workspaceRoot, config.orchestration.projectContextPath),
runtimeContext: {
ui_mode: executionMode,
run_provider: provider,
...(session
? {
session_id: sessionId,
project_path: session.projectPath,
base_workspace_path: session.baseWorkspacePath,
}
: {}),
...(input.runtimeContextOverrides ?? {}),
},
},
@@ -388,7 +527,7 @@ export class UiRunService {
});
await writeRunMeta({
stateRoot: config.orchestration.stateRoot,
stateRoot: runtime.stateRoot,
sessionId,
run: record,
});
@@ -404,6 +543,7 @@ export class UiRunService {
},
},
signal: controller.signal,
...(session ? { sessionMetadata: session } : {}),
});
const completedRecord = this.runHistory.get(runId);
@@ -419,7 +559,7 @@ export class UiRunService {
this.runHistory.set(runId, next);
await writeRunMeta({
stateRoot: config.orchestration.stateRoot,
stateRoot: runtime.stateRoot,
sessionId,
run: next,
});
@@ -439,7 +579,7 @@ export class UiRunService {
this.runHistory.set(runId, next);
await writeRunMeta({
stateRoot: config.orchestration.stateRoot,
stateRoot: runtime.stateRoot,
sessionId,
run: next,
});

View File

@@ -6,6 +6,7 @@ import { buildSessionGraphInsight, buildSessionSummaries } from "./session-insig
import { UiConfigStore, type LimitSettings, type RuntimeNotificationSettings, type SecurityPolicySettings } from "./config-store.js";
import { ManifestStore } from "./manifest-store.js";
import { filterRuntimeEvents, readRuntimeEvents } from "./runtime-events-store.js";
import { filterClaudeTraceEvents, readClaudeTraceEvents } from "./claude-trace-store.js";
import { parseJsonBody, sendJson, methodNotAllowed, notFound, serveStaticFile } from "./http-utils.js";
import { readRunMetaBySession, UiRunService, type RunExecutionMode } from "./run-service.js";
import type { RunProvider } from "./provider-executor.js";
@@ -23,6 +24,14 @@ type StartRunRequest = {
provider?: RunProvider;
};
type CreateSessionRequest = {
projectPath: string;
};
type CloseSessionRequest = {
mergeToProject?: boolean;
};
function parsePort(value: string | undefined): number {
const parsed = Number(value ?? "4317");
if (!Number.isInteger(parsed) || parsed < 1 || parsed > 65535) {
@@ -102,14 +111,23 @@ function ensureProvider(value: unknown): RunProvider {
return value === "claude" ? "claude" : "codex";
}
function ensureNonEmptyString(value: unknown, field: string): string {
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`Field "${field}" is required.`);
}
return value.trim();
}
async function readRuntimePaths(configStore: UiConfigStore, workspaceRoot: string): Promise<{
stateRoot: string;
runtimeEventLogPath: string;
claudeTraceLogPath: string;
}> {
const snapshot = await configStore.readSnapshot();
return {
stateRoot: resolve(workspaceRoot, snapshot.paths.stateRoot),
runtimeEventLogPath: resolve(workspaceRoot, snapshot.paths.runtimeEventLogPath),
claudeTraceLogPath: resolve(workspaceRoot, snapshot.paths.claudeTraceLogPath),
};
}
@@ -298,7 +316,42 @@ async function handleApiRequest(input: {
return true;
}
if (pathname === "/api/claude-trace") {
if (method !== "GET") {
methodNotAllowed(response);
return true;
}
const { claudeTraceLogPath } = await readRuntimePaths(configStore, workspaceRoot);
const limit = parseLimit(requestUrl.searchParams.get("limit"), 200);
const sessionId = requestUrl.searchParams.get("sessionId") ?? undefined;
const events = filterClaudeTraceEvents(await readClaudeTraceEvents(claudeTraceLogPath), {
...(sessionId ? { sessionId } : {}),
limit,
});
sendJson(response, 200, {
ok: true,
events,
});
return true;
}
if (pathname === "/api/sessions") {
if (method === "POST") {
const body = await parseJsonBody<CreateSessionRequest>(request);
const projectPath = ensureNonEmptyString(body.projectPath, "projectPath");
const session = await runService.createSession({
projectPath,
});
sendJson(response, 201, {
ok: true,
session,
});
return true;
}
if (method !== "GET") {
methodNotAllowed(response);
return true;
@@ -309,10 +362,12 @@ async function handleApiRequest(input: {
stateRoot,
runtimeEventLogPath,
});
const metadata = await runService.listSessions();
sendJson(response, 200, {
ok: true,
sessions,
sessionMetadata: metadata,
runs: runService.listRuns(),
});
return true;
@@ -362,6 +417,118 @@ async function handleApiRequest(input: {
return true;
}
if (pathname.startsWith("/api/sessions/") && pathname.endsWith("/run")) {
if (method !== "POST") {
methodNotAllowed(response);
return true;
}
const sessionId = toRelativePathFromApi(pathname.slice("/api/sessions/".length, -"/run".length));
if (!sessionId) {
sendJson(response, 400, {
ok: false,
error: "Session id is required.",
});
return true;
}
const body = await parseJsonBody<StartRunRequest>(request);
if (typeof body.prompt !== "string" || body.prompt.trim().length === 0) {
sendJson(response, 400, {
ok: false,
error: 'Field "prompt" is required.',
});
return true;
}
const manifestSource = (() => {
if (body.manifest !== undefined) {
return body.manifest;
}
if (typeof body.manifestPath === "string" && body.manifestPath.trim().length > 0) {
return undefined;
}
return undefined;
})();
const resolvedManifest = manifestSource ?? (() => {
if (!body.manifestPath) {
return undefined;
}
return body.manifestPath;
})();
let manifest: unknown;
if (typeof resolvedManifest === "string") {
manifest = (await manifestStore.read(resolvedManifest)).source;
} else if (resolvedManifest !== undefined) {
manifest = resolvedManifest;
}
if (!manifest) {
sendJson(response, 400, {
ok: false,
error: "A manifest or manifestPath is required to start a run.",
});
return true;
}
const record = await runService.startRun({
prompt: body.prompt,
manifest,
manifestPath: body.manifestPath,
sessionId,
topologyHint: body.topologyHint,
initialFlags: ensureBooleanRecord(body.initialFlags),
runtimeContextOverrides: ensureRuntimeContext(body.runtimeContextOverrides),
simulateValidationNodeIds: ensureStringArray(body.simulateValidationNodeIds),
executionMode: ensureExecutionMode(body.executionMode),
provider: ensureProvider(body.provider),
});
sendJson(response, 202, {
ok: true,
run: record,
});
return true;
}
if (pathname.startsWith("/api/sessions/") && pathname.endsWith("/close")) {
if (method !== "POST") {
methodNotAllowed(response);
return true;
}
const sessionId = toRelativePathFromApi(pathname.slice("/api/sessions/".length, -"/close".length));
if (!sessionId) {
sendJson(response, 400, {
ok: false,
error: "Session id is required.",
});
return true;
}
let body: CloseSessionRequest = {};
try {
body = await parseJsonBody<CloseSessionRequest>(request);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (message !== "Request body is required.") {
throw error;
}
}
const session = await runService.closeSession({
sessionId,
mergeToProject: body.mergeToProject === true,
});
sendJson(response, 200, {
ok: true,
session,
});
return true;
}
if (pathname === "/api/runs") {
if (method === "GET") {
sendJson(response, 200, {

49
test_run.md Normal file
View File

@@ -0,0 +1,49 @@
• For this repo, a “test run” can mean 3 different things. Configure based on which one you want.
1. npm run verify (typecheck + tests + build)
- Required:
- Node + npm
- Dependencies installed: npm install
- Not required:
- API keys
- Command:
- npm run verify
2. UI dry run (no external model calls, safest first run)
- Required:
- npm install
- topologies, personas, relationships, topologyConstraints
- pipeline with entryNodeId, nodes, edges
3. Provider-backed run (Codex/Claude via CLI or UI executionMode=provider)
- Required:
- Everything in #2
- Auth for chosen provider:
- Codex/OpenAI: OPENAI_AUTH_MODE + (CODEX_API_KEY or OPENAI_API_KEY) or existing Codex login
- Claude: CLAUDE_CODE_OAUTH_TOKEN (preferred) or ANTHROPIC_API_KEY or existing Claude login
- git available and workspace is a valid git repo (runtime provisions git worktrees)
- Optional:
- mcp.config.json (default missing file is allowed if path is default)
- Important:
- If you set custom MCP_CONFIG_PATH, that file must exist.
Environment groups you can tune (defaults already exist)
- Provider/auth: keys, auth mode, base URL, model, MCP path
- Limits: AGENT_MAX_*, AGENT_TOPOLOGY_*, AGENT_RELATIONSHIP_MAX_CHILDREN
- Provisioning: worktree root/base ref, port range/locks, discovery relative path
- Security: violation mode, allowlisted binaries, timeout, audit path, env inherit/scrub
- Telemetry/UI: runtime event log + Discord settings, AGENT_UI_HOST/PORT
- Do not set runtime-injected vars manually (AGENT_WORKTREE_PATH, AGENT_PORT_RANGE_START, etc.).
Practical first-run sequence
1. npm install
2. cp .env.example .env
3. npm run verify
4. npm run ui
5. Start a run in mock mode with a valid manifest
6. Switch to provider mode after auth is confirmed

View File

@@ -0,0 +1,296 @@
import test from "node:test";
import assert from "node:assert/strict";
import { mkdtemp, readFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { SDKMessage } from "@anthropic-ai/claude-agent-sdk";
import { ClaudeObservabilityLogger, summarizeClaudeMessage } from "../src/ui/claude-observability.js";
test("summarizeClaudeMessage returns compact result metadata in summary mode", () => {
const message = {
type: "result",
subtype: "success",
stop_reason: "end_turn",
num_turns: 1,
total_cost_usd: 0.0012,
usage: {
input_tokens: 120,
output_tokens: 40,
},
result: "{\"status\":\"success\"}",
duration_ms: 40,
duration_api_ms: 32,
is_error: false,
modelUsage: {},
permission_denials: [],
uuid: "uuid-1",
session_id: "sdk-session-1",
} as unknown as SDKMessage;
const summary = summarizeClaudeMessage(message, "summary");
assert.equal(summary.messageType, "result");
assert.equal(summary.messageSubtype, "success");
assert.equal(summary.sdkSessionId, "sdk-session-1");
assert.equal(summary.summary, "Claude query result success.");
assert.equal(summary.data?.numTurns, 1);
const usage = summary.data?.usage as Record<string, unknown> | undefined;
assert.equal(usage?.input_tokens, 120);
});
test("summarizeClaudeMessage redacts sensitive fields in full mode", () => {
const message = {
type: "system",
subtype: "init",
session_id: "sdk-session-2",
uuid: "uuid-2",
apiKey: "top-secret",
nested: {
authToken: "really-secret",
ok: true,
},
} as unknown as SDKMessage;
const summary = summarizeClaudeMessage(message, "full");
const payload = summary.data?.message as Record<string, unknown> | undefined;
const nested = payload?.nested as Record<string, unknown> | undefined;
assert.equal(summary.messageType, "system");
assert.equal(summary.messageSubtype, "init");
assert.equal(payload?.apiKey, "[redacted]");
assert.equal(nested?.authToken, "[redacted]");
assert.equal(nested?.ok, true);
});
test("ClaudeObservabilityLogger samples tool_progress messages for stdout", () => {
const lines: string[] = [];
const originalLog = console.log;
const originalNow = Date.now;
let now = 1000;
console.log = (line?: unknown) => {
lines.push(String(line ?? ""));
};
Date.now = () => now;
try {
const logger = new ClaudeObservabilityLogger({
workspaceRoot: process.cwd(),
config: {
mode: "stdout",
verbosity: "summary",
logPath: ".ai_ops/events/claude-trace.ndjson",
includePartialMessages: false,
debug: false,
},
});
const context = {
sessionId: "session-a",
nodeId: "node-a",
attempt: 1,
depth: 0,
};
const makeMessage = (): SDKMessage =>
({
type: "tool_progress",
tool_name: "Bash",
tool_use_id: "tool-1",
parent_tool_use_id: null,
elapsed_time_seconds: 1,
uuid: "uuid-tool",
session_id: "sdk-session-tool",
}) as unknown as SDKMessage;
logger.recordMessage({
context,
message: makeMessage(),
});
now += 300;
logger.recordMessage({
context,
message: makeMessage(),
});
now += 1200;
logger.recordMessage({
context,
message: makeMessage(),
});
assert.equal(lines.length, 2);
assert.match(lines[0] ?? "", /^\[claude-trace\] /);
assert.match(lines[1] ?? "", /"suppressedSinceLastEmit":1/);
} finally {
console.log = originalLog;
Date.now = originalNow;
}
});
test("ClaudeObservabilityLogger keeps assistant/user message records in file output", async () => {
const workspace = await mkdtemp(join(tmpdir(), "claude-obsv-test-"));
const logPath = ".ai_ops/events/claude-trace.ndjson";
const logger = new ClaudeObservabilityLogger({
workspaceRoot: workspace,
config: {
mode: "file",
verbosity: "summary",
logPath,
includePartialMessages: false,
debug: false,
},
});
const context = {
sessionId: "session-file",
nodeId: "node-file",
attempt: 1,
depth: 0,
};
logger.recordQueryStarted({
context,
});
logger.recordMessage({
context,
message: {
type: "assistant",
uuid: "assistant-1",
session_id: "sdk-file-1",
parent_tool_use_id: null,
message: {} as never,
} as unknown as SDKMessage,
});
logger.recordMessage({
context,
message: {
type: "user",
uuid: "user-1",
session_id: "sdk-file-1",
parent_tool_use_id: null,
message: {} as never,
} as unknown as SDKMessage,
});
logger.recordMessage({
context,
message: {
type: "result",
subtype: "success",
stop_reason: "end_turn",
num_turns: 1,
total_cost_usd: 0.0012,
usage: {
input_tokens: 100,
output_tokens: 20,
},
result: "{}",
duration_ms: 10,
duration_api_ms: 9,
is_error: false,
modelUsage: {},
permission_denials: [],
uuid: "result-1",
session_id: "sdk-file-1",
} as unknown as SDKMessage,
});
logger.recordQueryCompleted({
context,
});
await logger.close();
const filePath = join(workspace, logPath);
const content = await readFile(filePath, "utf8");
const lines = content.split(/\r?\n/).filter((line) => line.trim().length > 0);
const records = lines.map((line) => JSON.parse(line) as Record<string, unknown>);
const messageTypes = records
.map((record) => record.sdkMessageType)
.filter((value) => typeof value === "string");
assert.equal(messageTypes.includes("assistant"), true);
assert.equal(messageTypes.includes("user"), true);
assert.equal(messageTypes.includes("result"), true);
});
test("summarizeClaudeMessage maps task_notification system subtype", () => {
const message = {
type: "system",
subtype: "task_notification",
task_id: "task-1",
status: "completed",
output_file: "/tmp/out.txt",
summary: "Task complete",
uuid: "uuid-task",
session_id: "sdk-session-task",
} as unknown as SDKMessage;
const summary = summarizeClaudeMessage(message, "summary");
assert.equal(summary.messageType, "system");
assert.equal(summary.messageSubtype, "task_notification");
assert.equal(summary.summary, "Task notification: completed.");
assert.equal(summary.data?.taskId, "task-1");
});
test("ClaudeObservabilityLogger honors includePartialMessages for stream events", () => {
const lines: string[] = [];
const originalLog = console.log;
console.log = (line?: unknown) => {
lines.push(String(line ?? ""));
};
try {
const context = {
sessionId: "session-stream",
nodeId: "node-stream",
attempt: 1,
depth: 0,
};
const streamMessage = {
type: "stream_event",
event: {
type: "content_block_delta",
},
parent_tool_use_id: null,
uuid: "stream-1",
session_id: "sdk-session-stream",
} as unknown as SDKMessage;
const withoutPartial = new ClaudeObservabilityLogger({
workspaceRoot: process.cwd(),
config: {
mode: "stdout",
verbosity: "summary",
logPath: ".ai_ops/events/claude-trace.ndjson",
includePartialMessages: false,
debug: false,
},
});
withoutPartial.recordMessage({
context,
message: streamMessage,
});
const withPartial = new ClaudeObservabilityLogger({
workspaceRoot: process.cwd(),
config: {
mode: "stdout",
verbosity: "summary",
logPath: ".ai_ops/events/claude-trace.ndjson",
includePartialMessages: true,
debug: false,
},
});
withPartial.recordMessage({
context,
message: streamMessage,
});
assert.equal(lines.length, 1);
assert.match(lines[0] ?? "", /\"sdkMessageType\":\"stream_event\"/);
} finally {
console.log = originalLog;
}
});

View File

@@ -0,0 +1,42 @@
import test from "node:test";
import assert from "node:assert/strict";
import { mkdtemp, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { filterClaudeTraceEvents, readClaudeTraceEvents } from "../src/ui/claude-trace-store.js";
test("readClaudeTraceEvents parses and sorts ndjson records", async () => {
const workspace = await mkdtemp(join(tmpdir(), "claude-trace-store-"));
const logPath = join(workspace, "claude-trace.ndjson");
await writeFile(
logPath,
[
'{"timestamp":"2026-02-24T17:27:05.000Z","message":"later","sessionId":"s1"}',
'not-json',
'{"timestamp":"2026-02-24T17:26:00.000Z","message":"earlier","sessionId":"s1"}',
'{"message":"missing timestamp"}',
].join("\n"),
"utf8",
);
const events = await readClaudeTraceEvents(logPath);
assert.equal(events.length, 2);
assert.equal(events[0]?.message, "earlier");
assert.equal(events[1]?.message, "later");
});
test("filterClaudeTraceEvents filters by session and limit", () => {
const events = [
{ timestamp: "2026-02-24T17:00:00.000Z", message: "a", sessionId: "s1" },
{ timestamp: "2026-02-24T17:01:00.000Z", message: "b", sessionId: "s2" },
{ timestamp: "2026-02-24T17:02:00.000Z", message: "c", sessionId: "s1" },
];
const filtered = filterClaudeTraceEvents(events, {
sessionId: "s1",
limit: 1,
});
assert.equal(filtered.length, 1);
assert.equal(filtered[0]?.message, "c");
});

View File

@@ -12,6 +12,7 @@ test("loads defaults and freezes config", () => {
assert.equal(config.agentManager.maxConcurrentAgents, 4);
assert.equal(config.orchestration.maxDepth, 4);
assert.equal(config.orchestration.mergeConflictMaxAttempts, 2);
assert.equal(config.provisioning.portRange.basePort, 36000);
assert.equal(config.discovery.fileRelativePath, ".agent-context/resources.json");
assert.equal(config.security.violationHandling, "hard_abort");
@@ -24,6 +25,11 @@ test("loads defaults and freezes config", () => {
"session.failed",
]);
assert.equal(config.provider.openAiAuthMode, "auto");
assert.equal(config.provider.claudeObservability.mode, "off");
assert.equal(config.provider.claudeObservability.verbosity, "summary");
assert.equal(config.provider.claudeObservability.logPath, ".ai_ops/events/claude-trace.ndjson");
assert.equal(config.provider.claudeObservability.includePartialMessages, false);
assert.equal(config.provider.claudeObservability.debug, false);
assert.equal(Object.isFrozen(config), true);
assert.equal(Object.isFrozen(config.orchestration), true);
});
@@ -56,6 +62,38 @@ test("validates runtime discord severity mode", () => {
);
});
test("validates claude observability mode", () => {
assert.throws(
() => loadConfig({ CLAUDE_OBSERVABILITY_MODE: "stream" }),
/CLAUDE_OBSERVABILITY_MODE must be one of/,
);
});
test("validates claude observability verbosity", () => {
assert.throws(
() => loadConfig({ CLAUDE_OBSERVABILITY_VERBOSITY: "verbose" }),
/CLAUDE_OBSERVABILITY_VERBOSITY must be one of/,
);
});
test("loads claude observability settings", () => {
const config = loadConfig({
CLAUDE_OBSERVABILITY_MODE: "both",
CLAUDE_OBSERVABILITY_VERBOSITY: "full",
CLAUDE_OBSERVABILITY_LOG_PATH: ".ai_ops/debug/claude.ndjson",
CLAUDE_OBSERVABILITY_INCLUDE_PARTIAL: "true",
CLAUDE_OBSERVABILITY_DEBUG: "true",
CLAUDE_OBSERVABILITY_DEBUG_LOG_PATH: ".ai_ops/debug/claude-sdk.log",
});
assert.equal(config.provider.claudeObservability.mode, "both");
assert.equal(config.provider.claudeObservability.verbosity, "full");
assert.equal(config.provider.claudeObservability.logPath, ".ai_ops/debug/claude.ndjson");
assert.equal(config.provider.claudeObservability.includePartialMessages, true);
assert.equal(config.provider.claudeObservability.debug, true);
assert.equal(config.provider.claudeObservability.debugLogPath, ".ai_ops/debug/claude-sdk.log");
});
test("prefers CLAUDE_CODE_OAUTH_TOKEN over ANTHROPIC_API_KEY", () => {
const config = loadConfig({
CLAUDE_CODE_OAUTH_TOKEN: "oauth-token",
@@ -127,3 +165,10 @@ test("validates AGENT_WORKTREE_TARGET_PATH against parent traversal", () => {
/must not contain "\.\." path segments/,
);
});
test("validates AGENT_MERGE_CONFLICT_MAX_ATTEMPTS bounds", () => {
assert.throws(
() => loadConfig({ AGENT_MERGE_CONFLICT_MAX_ATTEMPTS: "0" }),
/AGENT_MERGE_CONFLICT_MAX_ATTEMPTS must be an integer >= 1/,
);
});

View File

@@ -614,6 +614,7 @@ test("runs parallel topology blocks concurrently and routes via domain-event edg
projectContextPatch: {
enqueueTasks: [
{
taskId: "task-integrate",
id: "task-integrate",
title: "Integrate feature branches",
status: "pending",
@@ -939,6 +940,86 @@ test("propagates abort signal into actor execution and stops the run", async ()
assert.equal(observedAbort, true);
});
test("createClaudeCanUseTool accepts tool casing differences from providers", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));
const projectContextPath = resolve(stateRoot, "project-context.json");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "coder",
displayName: "Coder",
systemPromptTemplate: "Coder",
toolClearance: {
allowlist: ["bash"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 2,
maxRetries: 0,
},
pipeline: {
entryNodeId: "case-node",
nodes: [
{
id: "case-node",
actorId: "case_actor",
personaId: "coder",
},
],
edges: [],
},
} as const;
const engine = new SchemaDrivenExecutionEngine({
manifest,
settings: {
workspaceRoot,
stateRoot,
projectContextPath,
maxChildren: 1,
maxDepth: 2,
maxRetries: 0,
runtimeContext: {},
},
actorExecutors: {
case_actor: async (input) => {
const canUseTool = input.mcp.createClaudeCanUseTool();
const allow = await canUseTool("Bash", {}, {
signal: new AbortController().signal,
toolUseID: "allow-bash",
});
assert.deepEqual(allow, {
behavior: "allow",
toolUseID: "allow-bash",
});
return {
status: "success",
payload: {
ok: true,
},
};
},
},
});
const result = await engine.runSession({
sessionId: "session-claude-tool-casing",
initialPayload: {
task: "verify tool casing",
},
});
assert.equal(result.status, "success");
});
test("hard-aborts pipeline on security violations by default", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-workspace-"));
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-state-"));

View File

@@ -28,6 +28,7 @@ test("project context store reads defaults and applies domain patches", async ()
},
enqueueTasks: [
{
taskId: "task-1",
id: "task-1",
title: "Build parser",
status: "pending",
@@ -38,11 +39,13 @@ test("project context store reads defaults and applies domain patches", async ()
const updated = await store.patchState({
upsertTasks: [
{
taskId: "task-1",
id: "task-1",
title: "Build parser",
status: "in_progress",
},
{
taskId: "task-2",
id: "task-2",
title: "Add tests",
status: "pending",
@@ -59,6 +62,35 @@ test("project context store reads defaults and applies domain patches", async ()
assert.equal(updated.schemaVersion, 1);
});
test("project context accepts conflict-aware task statuses", async () => {
const root = await mkdtemp(resolve(tmpdir(), "ai-ops-project-context-conflict-"));
const store = new FileSystemProjectContextStore({
filePath: resolve(root, "project-context.json"),
});
const updated = await store.patchState({
upsertTasks: [
{
taskId: "task-conflict",
id: "task-conflict",
title: "Resolve merge conflict",
status: "conflict",
},
{
taskId: "task-resolving",
id: "task-resolving",
title: "Retry merge",
status: "resolving_conflict",
},
],
});
assert.deepEqual(
updated.taskQueue.map((task) => `${task.taskId}:${task.status}`),
["task-conflict:conflict", "task-resolving:resolving_conflict"],
);
});
test("project context parser merges missing root keys with defaults", async () => {
const root = await mkdtemp(resolve(tmpdir(), "ai-ops-project-context-"));
const filePath = resolve(root, "project-context.json");
@@ -70,6 +102,7 @@ test("project context parser merges missing root keys with defaults", async () =
{
taskQueue: [
{
taskId: "task-1",
id: "task-1",
title: "Migrate",
status: "pending",

View File

@@ -1,10 +1,14 @@
import test from "node:test";
import assert from "node:assert/strict";
import { mkdtemp, writeFile } from "node:fs/promises";
import { execFile } from "node:child_process";
import { mkdtemp, mkdir, stat, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { resolve } from "node:path";
import { promisify } from "node:util";
import { UiRunService, readRunMetaBySession } from "../src/ui/run-service.js";
const execFileAsync = promisify(execFile);
async function waitForTerminalRun(
runService: UiRunService,
runId: string,
@@ -94,3 +98,140 @@ test("run service persists failure when pipeline summary is failure", async () =
});
assert.equal(persisted?.status, "failure");
});
test("run service creates, runs, and closes explicit sessions", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-run-service-session-"));
const stateRoot = resolve(workspaceRoot, "state");
const envPath = resolve(workspaceRoot, ".env");
const projectPath = resolve(workspaceRoot, "project");
await mkdir(projectPath, { recursive: true });
await execFileAsync("git", ["init", projectPath], { encoding: "utf8" });
await execFileAsync("git", ["-C", projectPath, "config", "user.name", "AI Ops"], { encoding: "utf8" });
await execFileAsync("git", ["-C", projectPath, "config", "user.email", "ai-ops@example.local"], { encoding: "utf8" });
await writeFile(resolve(projectPath, "README.md"), "# project\n", "utf8");
await execFileAsync("git", ["-C", projectPath, "add", "README.md"], { encoding: "utf8" });
await execFileAsync("git", ["-C", projectPath, "commit", "-m", "initial"], { encoding: "utf8" });
await writeFile(
envPath,
[
`AGENT_STATE_ROOT=${stateRoot}`,
"AGENT_WORKTREE_ROOT=.ai_ops/worktrees",
"AGENT_WORKTREE_BASE_REF=HEAD",
].join("\n"),
"utf8",
);
const runService = new UiRunService({
workspaceRoot,
envFilePath: ".env",
});
const createdSession = await runService.createSession({
projectPath,
});
assert.equal(createdSession.sessionStatus, "active");
const manifest = {
schemaVersion: "1",
topologies: ["sequential"],
personas: [
{
id: "writer",
displayName: "Writer",
systemPromptTemplate: "Write draft",
toolClearance: {
allowlist: ["read_file", "write_file"],
banlist: [],
},
},
],
relationships: [],
topologyConstraints: {
maxDepth: 1,
maxRetries: 0,
},
pipeline: {
entryNodeId: "write-node",
nodes: [
{
id: "write-node",
actorId: "writer-actor",
personaId: "writer",
},
],
edges: [],
},
};
const started = await runService.startRun({
prompt: "complete task",
manifest,
sessionId: createdSession.sessionId,
executionMode: "mock",
});
const terminalStatus = await waitForTerminalRun(runService, started.runId);
assert.equal(terminalStatus, "success");
const closed = await runService.closeSession({
sessionId: createdSession.sessionId,
});
assert.equal(closed.sessionStatus, "closed");
await assert.rejects(() => stat(createdSession.baseWorkspacePath), {
code: "ENOENT",
});
});
test("run service marks session closed_with_conflicts when close merge conflicts", async () => {
const workspaceRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-run-service-close-conflict-"));
const stateRoot = resolve(workspaceRoot, "state");
const envPath = resolve(workspaceRoot, ".env");
const projectPath = resolve(workspaceRoot, "project");
await mkdir(projectPath, { recursive: true });
await execFileAsync("git", ["init", projectPath], { encoding: "utf8" });
await execFileAsync("git", ["-C", projectPath, "config", "user.name", "AI Ops"], { encoding: "utf8" });
await execFileAsync("git", ["-C", projectPath, "config", "user.email", "ai-ops@example.local"], { encoding: "utf8" });
await writeFile(resolve(projectPath, "README.md"), "base\n", "utf8");
await execFileAsync("git", ["-C", projectPath, "add", "README.md"], { encoding: "utf8" });
await execFileAsync("git", ["-C", projectPath, "commit", "-m", "initial"], { encoding: "utf8" });
await writeFile(
envPath,
[
`AGENT_STATE_ROOT=${stateRoot}`,
"AGENT_WORKTREE_ROOT=.ai_ops/worktrees",
"AGENT_WORKTREE_BASE_REF=HEAD",
].join("\n"),
"utf8",
);
const runService = new UiRunService({
workspaceRoot,
envFilePath: ".env",
});
const createdSession = await runService.createSession({
projectPath,
});
await writeFile(resolve(createdSession.baseWorkspacePath, "README.md"), "base branch update\n", "utf8");
await execFileAsync("git", ["-C", createdSession.baseWorkspacePath, "add", "README.md"], { encoding: "utf8" });
await execFileAsync("git", ["-C", createdSession.baseWorkspacePath, "commit", "-m", "base update"], { encoding: "utf8" });
await writeFile(resolve(projectPath, "README.md"), "project branch update\n", "utf8");
await execFileAsync("git", ["-C", projectPath, "add", "README.md"], { encoding: "utf8" });
await execFileAsync("git", ["-C", projectPath, "commit", "-m", "project update"], { encoding: "utf8" });
const closed = await runService.closeSession({
sessionId: createdSession.sessionId,
mergeToProject: true,
});
assert.equal(closed.sessionStatus, "closed_with_conflicts");
const baseWorkspaceStats = await stat(createdSession.baseWorkspacePath);
assert.equal(baseWorkspaceStats.isDirectory(), true);
});

View File

@@ -155,3 +155,41 @@ test("secure executor runs with explicit env policy", async () => {
assert.equal(result.stdout, "ok|\n");
assert.equal(streamedStdout, result.stdout);
});
test("rules engine carries session context in tool audit events", () => {
const events: Array<Record<string, unknown>> = [];
const rules = new SecurityRulesEngine(
{
allowedBinaries: ["git"],
worktreeRoot: "/tmp",
protectedPaths: [],
requireCwdWithinWorktree: true,
rejectRelativePathTraversal: true,
enforcePathBoundaryOnArguments: true,
allowedEnvAssignments: [],
blockedEnvAssignments: [],
},
(event) => {
events.push(event as unknown as Record<string, unknown>);
},
);
rules.assertToolInvocationAllowed({
tool: "git",
toolClearance: {
allowlist: ["git"],
banlist: [],
},
context: {
sessionId: "session-ctx",
nodeId: "node-ctx",
attempt: 2,
},
});
const allowedEvent = events.find((event) => event.type === "tool.invocation_allowed");
assert.ok(allowedEvent);
assert.equal(allowedEvent.sessionId, "session-ctx");
assert.equal(allowedEvent.nodeId, "node-ctx");
assert.equal(allowedEvent.attempt, 2);
});

View File

@@ -0,0 +1,230 @@
import test from "node:test";
import assert from "node:assert/strict";
import { execFile } from "node:child_process";
import { mkdtemp, mkdir, readFile, rm, stat, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { resolve } from "node:path";
import { promisify } from "node:util";
import {
FileSystemSessionMetadataStore,
SessionWorktreeManager,
type SessionMetadata,
} from "../src/agents/session-lifecycle.js";
const execFileAsync = promisify(execFile);
async function git(args: string[]): Promise<string> {
const { stdout } = await execFileAsync("git", args, {
encoding: "utf8",
});
return stdout.trim();
}
test("session metadata store persists and updates session metadata", async () => {
const stateRoot = await mkdtemp(resolve(tmpdir(), "ai-ops-session-store-"));
const store = new FileSystemSessionMetadataStore({ stateRoot });
const created = await store.createSession({
sessionId: "session-abc",
projectPath: resolve(stateRoot, "project"),
baseWorkspacePath: resolve(stateRoot, "worktrees", "session-abc", "base"),
});
assert.equal(created.sessionStatus, "active");
assert.equal(created.sessionId, "session-abc");
const listed = await store.listSessions();
assert.equal(listed.length, 1);
assert.equal(listed[0]?.sessionId, "session-abc");
const updated = await store.updateSession("session-abc", {
sessionStatus: "closed",
});
assert.equal(updated.sessionStatus, "closed");
const readBack = await store.readSession("session-abc");
assert.equal(readBack?.sessionStatus, "closed");
const closedWithConflicts = await store.updateSession("session-abc", {
sessionStatus: "closed_with_conflicts",
});
assert.equal(closedWithConflicts.sessionStatus, "closed_with_conflicts");
});
test("session worktree manager provisions and merges task worktrees", async () => {
const root = await mkdtemp(resolve(tmpdir(), "ai-ops-session-worktree-"));
const projectPath = resolve(root, "project");
const worktreeRoot = resolve(root, "worktrees");
await mkdir(projectPath, { recursive: true });
await git(["init", projectPath]);
await git(["-C", projectPath, "config", "user.name", "AI Ops"]);
await git(["-C", projectPath, "config", "user.email", "ai-ops@example.local"]);
await writeFile(resolve(projectPath, "README.md"), "# project\n", "utf8");
await git(["-C", projectPath, "add", "README.md"]);
await git(["-C", projectPath, "commit", "-m", "initial commit"]);
const manager = new SessionWorktreeManager({
worktreeRoot,
baseRef: "HEAD",
});
const sessionId = "session-1";
const baseWorkspacePath = manager.resolveBaseWorkspacePath(sessionId);
await manager.initializeSessionBaseWorkspace({
sessionId,
projectPath,
baseWorkspacePath,
});
const baseStats = await stat(baseWorkspacePath);
assert.equal(baseStats.isDirectory(), true);
const taskWorktreePath = (
await manager.ensureTaskWorktree({
sessionId,
taskId: "task-1",
baseWorkspacePath,
})
).taskWorktreePath;
await writeFile(resolve(taskWorktreePath, "feature.txt"), "task output\n", "utf8");
const mergeOutcome = await manager.mergeTaskIntoBase({
taskId: "task-1",
baseWorkspacePath,
taskWorktreePath,
});
assert.equal(mergeOutcome.kind, "success");
const mergedFile = await readFile(resolve(baseWorkspacePath, "feature.txt"), "utf8");
assert.equal(mergedFile, "task output\n");
const session: SessionMetadata = {
sessionId,
projectPath,
baseWorkspacePath,
sessionStatus: "active",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
const closeOutcome = await manager.closeSession({
session,
taskWorktreePaths: [],
mergeBaseIntoProject: false,
});
assert.equal(closeOutcome.kind, "success");
await assert.rejects(() => stat(baseWorkspacePath), {
code: "ENOENT",
});
});
test("session worktree manager returns conflict outcome instead of throwing", async () => {
const root = await mkdtemp(resolve(tmpdir(), "ai-ops-session-worktree-conflict-"));
const projectPath = resolve(root, "project");
const worktreeRoot = resolve(root, "worktrees");
await mkdir(projectPath, { recursive: true });
await git(["init", projectPath]);
await git(["-C", projectPath, "config", "user.name", "AI Ops"]);
await git(["-C", projectPath, "config", "user.email", "ai-ops@example.local"]);
await writeFile(resolve(projectPath, "README.md"), "base\n", "utf8");
await git(["-C", projectPath, "add", "README.md"]);
await git(["-C", projectPath, "commit", "-m", "initial commit"]);
const manager = new SessionWorktreeManager({
worktreeRoot,
baseRef: "HEAD",
});
const sessionId = "session-conflict-1";
const baseWorkspacePath = manager.resolveBaseWorkspacePath(sessionId);
await manager.initializeSessionBaseWorkspace({
sessionId,
projectPath,
baseWorkspacePath,
});
const taskWorktreePath = (
await manager.ensureTaskWorktree({
sessionId,
taskId: "task-conflict",
baseWorkspacePath,
})
).taskWorktreePath;
await writeFile(resolve(baseWorkspacePath, "README.md"), "base branch change\n", "utf8");
await git(["-C", baseWorkspacePath, "add", "README.md"]);
await git(["-C", baseWorkspacePath, "commit", "-m", "base update"]);
await writeFile(resolve(taskWorktreePath, "README.md"), "task branch change\n", "utf8");
const mergeOutcome = await manager.mergeTaskIntoBase({
taskId: "task-conflict",
baseWorkspacePath,
taskWorktreePath,
});
assert.equal(mergeOutcome.kind, "conflict");
if (mergeOutcome.kind !== "conflict") {
throw new Error("Expected merge conflict outcome.");
}
assert.equal(mergeOutcome.taskId, "task-conflict");
assert.equal(mergeOutcome.worktreePath, taskWorktreePath);
assert.ok(mergeOutcome.conflictFiles.includes("README.md"));
});
test("session worktree manager recreates a task worktree after stale metadata prune", async () => {
const root = await mkdtemp(resolve(tmpdir(), "ai-ops-session-worktree-prune-"));
const projectPath = resolve(root, "project");
const worktreeRoot = resolve(root, "worktrees");
await mkdir(projectPath, { recursive: true });
await git(["init", projectPath]);
await git(["-C", projectPath, "config", "user.name", "AI Ops"]);
await git(["-C", projectPath, "config", "user.email", "ai-ops@example.local"]);
await writeFile(resolve(projectPath, "README.md"), "# project\n", "utf8");
await git(["-C", projectPath, "add", "README.md"]);
await git(["-C", projectPath, "commit", "-m", "initial commit"]);
const manager = new SessionWorktreeManager({
worktreeRoot,
baseRef: "HEAD",
});
const sessionId = "session-prune-1";
const taskId = "task-prune-1";
const baseWorkspacePath = manager.resolveBaseWorkspacePath(sessionId);
await manager.initializeSessionBaseWorkspace({
sessionId,
projectPath,
baseWorkspacePath,
});
const initialTaskWorktreePath = (
await manager.ensureTaskWorktree({
sessionId,
taskId,
baseWorkspacePath,
})
).taskWorktreePath;
await rm(initialTaskWorktreePath, { recursive: true, force: true });
const recreatedTaskWorktreePath = (
await manager.ensureTaskWorktree({
sessionId,
taskId,
baseWorkspacePath,
})
).taskWorktreePath;
assert.equal(recreatedTaskWorktreePath, initialTaskWorktreePath);
const stats = await stat(recreatedTaskWorktreePath);
assert.equal(stats.isDirectory(), true);
});

View File