"""Task Master Agent - Bridge to claude-task-master for task graph management.""" import asyncio import json import logging import os import subprocess from pathlib import Path logger = logging.getLogger(__name__) class TaskMasterAgent: """Bridge to claude-task-master for task graph management and dependency resolution.""" def __init__(self, project_root: str, mcp_client=None): self.project_root = str(project_root) self.mcp_client = mcp_client self.max_retries = 3 self.base_delay = 1.0 async def parse_prd(self, prd_content: str, num_tasks: int = 10) -> dict: """Write PRD content to disk and invoke task-master parse-prd.""" docs_dir = Path(self.project_root) / ".taskmaster" / "docs" docs_dir.mkdir(parents=True, exist_ok=True) prd_path = docs_dir / "prd.md" prd_path.write_text(prd_content) result = await self._call_with_retry( self._run_cli, "parse-prd", str(prd_path), "--num-tasks", str(num_tasks), "--force", ) return result async def get_unblocked_tasks(self) -> list: """Get all pending tasks whose dependencies are all done.""" result = await self._call_with_retry(self._run_cli, "list", "--json") tasks = result.get("tasks", []) done_ids = { str(t["id"]) for t in tasks if t.get("status") == "done" } unblocked = [] for task in tasks: if task.get("status") != "pending": continue deps = [str(d) for d in task.get("dependencies", [])] if all(d in done_ids for d in deps): unblocked.append(task) return unblocked async def update_task_status( self, task_id: str, status: str, notes: str = "" ): """Update a task's status and optionally add implementation notes.""" await self._call_with_retry( self._run_cli, "set-status", f"--id={task_id}", f"--status={status}", ) if notes: await self._call_with_retry( self._run_cli, "update-subtask", f"--id={task_id}", f"--prompt={notes}", ) async def get_task_details(self, task_id: str) -> dict: """Get full details for a specific task.""" result = await self._call_with_retry( self._run_cli, "show", str(task_id), "--json" ) task = result.get("task", result) return { "id": task.get("id"), "title": task.get("title", ""), "description": task.get("description", ""), "details": task.get("details", ""), "testStrategy": task.get("testStrategy", ""), "dependencies": task.get("dependencies", []), "subtasks": task.get("subtasks", []), "status": task.get("status", "pending"), "priority": task.get("priority", ""), } async def get_next_task(self) -> dict | None: """Get the highest-priority unblocked task, or None.""" try: result = await self._call_with_retry( self._run_cli, "next", "--json" ) task = result.get("task", result) if task and task.get("id"): return task except RuntimeError: logger.debug("next_task command failed, falling back to manual selection") unblocked = await self.get_unblocked_tasks() if not unblocked: return None priority_order = {"high": 0, "medium": 1, "low": 2} unblocked.sort( key=lambda t: ( priority_order.get(t.get("priority", "medium"), 1), t.get("id", 0), ) ) return unblocked[0] async def expand_task(self, task_id: str, num_subtasks: int = 5) -> dict: """Break a task into subtasks.""" result = await self._call_with_retry( self._run_cli, "expand", f"--id={task_id}", f"--num={num_subtasks}", "--force", ) return result async def _call_with_retry(self, func, *args, **kwargs): """Retry with exponential backoff.""" last_exc = None for attempt in range(self.max_retries): try: return await func(*args, **kwargs) except Exception as exc: last_exc = exc if attempt < self.max_retries - 1: delay = self.base_delay * (2 ** attempt) logger.warning( "Attempt %d/%d failed: %s. Retrying in %.1fs", attempt + 1, self.max_retries, exc, delay, ) await asyncio.sleep(delay) raise RuntimeError( f"All {self.max_retries} attempts failed. Last error: {last_exc}" ) from last_exc async def _run_cli(self, *args: str) -> dict: """Execute a task-master CLI command and return parsed JSON output.""" cmd = ["task-master", *args] logger.debug("Running CLI: %s", " ".join(cmd)) proc = await asyncio.get_event_loop().run_in_executor( None, lambda: subprocess.run( cmd, capture_output=True, text=True, cwd=self.project_root, timeout=120, ), ) if proc.returncode != 0: raise RuntimeError( f"task-master {args[0]} failed (rc={proc.returncode}): {proc.stderr.strip()}" ) stdout = proc.stdout.strip() if not stdout: return {} try: return json.loads(stdout) except json.JSONDecodeError: return {"raw_output": stdout}