181 lines
5.9 KiB
Python
181 lines
5.9 KiB
Python
"""Task Master Agent - Bridge to claude-task-master for task graph management."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskMasterAgent:
|
|
"""Bridge to claude-task-master for task graph management and dependency resolution."""
|
|
|
|
def __init__(self, project_root: str, mcp_client=None):
|
|
self.project_root = str(project_root)
|
|
self.mcp_client = mcp_client
|
|
self.max_retries = 3
|
|
self.base_delay = 1.0
|
|
|
|
async def parse_prd(self, prd_content: str, num_tasks: int = 10) -> dict:
|
|
"""Write PRD content to disk and invoke task-master parse-prd."""
|
|
docs_dir = Path(self.project_root) / ".taskmaster" / "docs"
|
|
docs_dir.mkdir(parents=True, exist_ok=True)
|
|
prd_path = docs_dir / "prd.md"
|
|
prd_path.write_text(prd_content)
|
|
|
|
result = await self._call_with_retry(
|
|
self._run_cli,
|
|
"parse-prd",
|
|
str(prd_path),
|
|
"--num-tasks",
|
|
str(num_tasks),
|
|
"--force",
|
|
)
|
|
return result
|
|
|
|
async def get_unblocked_tasks(self) -> list:
|
|
"""Get all pending tasks whose dependencies are all done."""
|
|
result = await self._call_with_retry(self._run_cli, "list", "--json")
|
|
tasks = result.get("tasks", [])
|
|
|
|
done_ids = {
|
|
str(t["id"]) for t in tasks if t.get("status") == "done"
|
|
}
|
|
|
|
unblocked = []
|
|
for task in tasks:
|
|
if task.get("status") != "pending":
|
|
continue
|
|
deps = [str(d) for d in task.get("dependencies", [])]
|
|
if all(d in done_ids for d in deps):
|
|
unblocked.append(task)
|
|
|
|
return unblocked
|
|
|
|
async def update_task_status(
|
|
self, task_id: str, status: str, notes: str = ""
|
|
):
|
|
"""Update a task's status and optionally add implementation notes."""
|
|
await self._call_with_retry(
|
|
self._run_cli,
|
|
"set-status",
|
|
f"--id={task_id}",
|
|
f"--status={status}",
|
|
)
|
|
if notes:
|
|
await self._call_with_retry(
|
|
self._run_cli,
|
|
"update-subtask",
|
|
f"--id={task_id}",
|
|
f"--prompt={notes}",
|
|
)
|
|
|
|
async def get_task_details(self, task_id: str) -> dict:
|
|
"""Get full details for a specific task."""
|
|
result = await self._call_with_retry(
|
|
self._run_cli, "show", str(task_id), "--json"
|
|
)
|
|
task = result.get("task", result)
|
|
return {
|
|
"id": task.get("id"),
|
|
"title": task.get("title", ""),
|
|
"description": task.get("description", ""),
|
|
"details": task.get("details", ""),
|
|
"testStrategy": task.get("testStrategy", ""),
|
|
"dependencies": task.get("dependencies", []),
|
|
"subtasks": task.get("subtasks", []),
|
|
"status": task.get("status", "pending"),
|
|
"priority": task.get("priority", ""),
|
|
}
|
|
|
|
async def get_next_task(self) -> dict | None:
|
|
"""Get the highest-priority unblocked task, or None."""
|
|
try:
|
|
result = await self._call_with_retry(
|
|
self._run_cli, "next", "--json"
|
|
)
|
|
task = result.get("task", result)
|
|
if task and task.get("id"):
|
|
return task
|
|
except RuntimeError:
|
|
logger.debug("next_task command failed, falling back to manual selection")
|
|
|
|
unblocked = await self.get_unblocked_tasks()
|
|
if not unblocked:
|
|
return None
|
|
|
|
priority_order = {"high": 0, "medium": 1, "low": 2}
|
|
unblocked.sort(
|
|
key=lambda t: (
|
|
priority_order.get(t.get("priority", "medium"), 1),
|
|
t.get("id", 0),
|
|
)
|
|
)
|
|
return unblocked[0]
|
|
|
|
async def expand_task(self, task_id: str, num_subtasks: int = 5) -> dict:
|
|
"""Break a task into subtasks."""
|
|
result = await self._call_with_retry(
|
|
self._run_cli,
|
|
"expand",
|
|
f"--id={task_id}",
|
|
f"--num={num_subtasks}",
|
|
"--force",
|
|
)
|
|
return result
|
|
|
|
async def _call_with_retry(self, func, *args, **kwargs):
|
|
"""Retry with exponential backoff."""
|
|
last_exc = None
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
if attempt < self.max_retries - 1:
|
|
delay = self.base_delay * (2 ** attempt)
|
|
logger.warning(
|
|
"Attempt %d/%d failed: %s. Retrying in %.1fs",
|
|
attempt + 1,
|
|
self.max_retries,
|
|
exc,
|
|
delay,
|
|
)
|
|
await asyncio.sleep(delay)
|
|
raise RuntimeError(
|
|
f"All {self.max_retries} attempts failed. Last error: {last_exc}"
|
|
) from last_exc
|
|
|
|
async def _run_cli(self, *args: str) -> dict:
|
|
"""Execute a task-master CLI command and return parsed JSON output."""
|
|
cmd = ["task-master", *args]
|
|
logger.debug("Running CLI: %s", " ".join(cmd))
|
|
|
|
proc = await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
lambda: subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=self.project_root,
|
|
timeout=120,
|
|
),
|
|
)
|
|
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(
|
|
f"task-master {args[0]} failed (rc={proc.returncode}): {proc.stderr.strip()}"
|
|
)
|
|
|
|
stdout = proc.stdout.strip()
|
|
if not stdout:
|
|
return {}
|
|
|
|
try:
|
|
return json.loads(stdout)
|
|
except json.JSONDecodeError:
|
|
return {"raw_output": stdout}
|