"""Dev Agent Manager - Spawns Dev Agents in Docker containers via Claude Code.""" import asyncio import logging import os import re import tempfile from pathlib import Path import pexpect logger = logging.getLogger(__name__) PROMPT_TEMPLATE_PATH = Path(__file__).resolve().parent.parent / "prompts" / "dev_task_execution.txt" class DevAgentManager: """Spawns Dev Agents in Docker containers, interfaces with Claude Code via pexpect.""" def __init__(self, docker_client=None, max_retries: int = 3, timeout: int = 1800): """Initialize DevAgentManager. Args: docker_client: Docker client instance (or None to create from env). max_retries: Maximum Dev-QA bounce retries per task. timeout: Timeout in seconds for Claude Code execution (default 30 min). """ if docker_client is not None: self.docker_client = docker_client else: import docker self.docker_client = docker.from_env() self.max_retries = max_retries self.timeout = timeout self._retry_counts: dict[str, int] = {} def prepare_task_prompt(self, task: dict, global_arch: str = "") -> str: """Build a prompt string for the Dev Agent from the template. Args: task: Task dict with keys task_id, title, description, details, testStrategy. global_arch: Optional global architecture summary. Returns: Formatted prompt string. """ template = PROMPT_TEMPLATE_PATH.read_text() return template.format( task_id=task.get("task_id", task.get("id", "")), title=task.get("title", ""), description=task.get("description", ""), details=task.get("details", ""), test_strategy=task.get("testStrategy", ""), global_architecture=global_arch or "No architecture context provided.", ) async def execute_task( self, task: dict, container_id: str, worktree_path: str, global_arch: str = "", ) -> dict: """Execute a task inside a Docker container using Claude Code. Args: task: Task dict. container_id: Docker container ID to exec into. worktree_path: Host path to the worktree (mounted at /workspace). global_arch: Optional architecture context. Returns: Dict with status, output, files_changed, and exit_code. """ prompt = self.prepare_task_prompt(task, global_arch) # Write prompt to temp file in worktree so it's visible inside the container prompt_file = os.path.join(worktree_path, ".task_prompt.txt") with open(prompt_file, "w") as f: f.write(prompt) cmd = f"docker exec {container_id} claude --print --prompt-file /workspace/.task_prompt.txt" try: child = pexpect.spawn(cmd, timeout=self.timeout, encoding="utf-8") child.expect(pexpect.EOF, timeout=self.timeout) output = child.before or "" child.close() exit_code = child.exitstatus if child.exitstatus is not None else -1 except pexpect.TIMEOUT: try: child.close(force=True) except Exception: pass return { "status": "failed", "output": "timeout", "files_changed": [], "exit_code": -1, } finally: # Clean up prompt file try: os.remove(prompt_file) except OSError: pass parsed = self.parse_claude_output(output) if exit_code == 0: status = "success" else: status = "failed" return { "status": status, "output": output, "files_changed": parsed["files_changed"], "exit_code": exit_code, } def parse_claude_output(self, output: str) -> dict: """Parse Claude Code output to extract structured info. Args: output: Raw stdout from Claude Code. Returns: Dict with files_changed, test_results, and errors. """ # Extract file paths (common patterns: Created/Modified/Updated path/to/file.py) file_patterns = re.findall( r"(?:(?:Creat|Modifi|Updat|Edit|Writ)(?:ed|ing)\s+)([^\s]+\.\w+)", output, ) # Also catch paths that look like source files mentioned standalone standalone_paths = re.findall( r"(?:^|\s)([\w./]+\.(?:py|js|ts|yaml|yml|json|txt|md|toml|cfg))\b", output, ) all_files = list(dict.fromkeys(file_patterns + standalone_paths)) # dedupe, preserve order # Extract test results test_results = {} passed_match = re.search(r"(\d+)\s+passed", output) failed_match = re.search(r"(\d+)\s+failed", output) if passed_match: test_results["passed"] = int(passed_match.group(1)) if failed_match: test_results["failed"] = int(failed_match.group(1)) # Extract error messages errors = re.findall(r"(?:Error|Exception|FAILED)[:\s]+(.*?)(?:\n|$)", output, re.IGNORECASE) return { "files_changed": all_files, "test_results": test_results, "errors": errors, } async def execute_with_retry( self, task: dict, container_id: str, worktree_path: str, global_arch: str = "", ) -> dict: """Execute a task with retry logic. Retries up to max_retries times on failure. If all retries are exhausted, returns a result with status 'needs_clarification'. Args: task: Task dict. container_id: Docker container ID. worktree_path: Host worktree path. global_arch: Optional architecture context. Returns: Final execution result dict. """ task_id = str(task.get("task_id", task.get("id", ""))) for attempt in range(self.max_retries): self._retry_counts[task_id] = attempt + 1 result = await self.execute_task(task, container_id, worktree_path, global_arch) if result["status"] == "success": return result # All retries exhausted return { "status": "needs_clarification", "output": result.get("output", ""), "files_changed": result.get("files_changed", []), "exit_code": result.get("exit_code", -1), } def get_retry_count(self, task_id: str) -> int: """Return current retry count for a task.""" return self._retry_counts.get(task_id, 0) def reset_retry_count(self, task_id: str): """Reset retry counter for a task (after clarification resolved).""" self._retry_counts.pop(task_id, None)