"""QA Agent - Handles code review, testing, linting, and merge operations.""" import os import re import subprocess from pathlib import Path import git from app_factory.core.claude_client import ClaudeSDKClient class QAAgent: """Reviews code, runs tests, handles merge conflicts, merges worktrees to main.""" def __init__( self, repo_path: str, api_key: str = None, auth_token: str = None, max_retries: int = 3, debug: bool = False, observability=None, ): """Initialize QAAgent. Args: repo_path: Path to the git repository. api_key: Optional API key. Falls back to ANTHROPIC_API_KEY env var. max_retries: Maximum QA-Dev bounce retries per task. """ self.repo = git.Repo(repo_path) self.repo_path = Path(repo_path).resolve() self.max_retries = max_retries self._retry_counts: dict[str, int] = {} self._prompts_dir = Path(__file__).resolve().parent.parent / "prompts" self.observability = observability resolved_key = api_key or os.environ.get("ANTHROPIC_API_KEY") resolved_auth = auth_token or os.environ.get("ANTHROPIC_AUTH_TOKEN") self.client = ClaudeSDKClient( api_key=resolved_key, auth_token=resolved_auth, enable_debug=debug, ) async def review_and_merge(self, task_id: str, worktree_path: str, task: dict = None) -> dict: """Full QA pipeline: rebase, lint, test, review, merge. Returns: dict with status and details. Status is one of: 'merged', 'rebase_failed', 'lint_failed', 'tests_failed', 'review_failed'. """ # 1. Rebase feature branch onto main rebase_result = await self.rebase_onto_main(worktree_path, task_id) if not rebase_result["success"]: self._increment_retry(task_id) return { "status": "rebase_failed", "conflicts": rebase_result.get("conflicts", []), "retry_count": self.get_retry_count(task_id), } # 2. Run linting lint_result = self.run_linter(worktree_path) if not lint_result["passed"]: self._increment_retry(task_id) return { "status": "lint_failed", "errors": lint_result["errors"], "warnings": lint_result["warnings"], "retry_count": self.get_retry_count(task_id), } # 3. Run tests test_result = self.run_tests(worktree_path) if not test_result["passed"]: self._increment_retry(task_id) return { "status": "tests_failed", "total": test_result["total"], "failures": test_result["failures"], "errors": test_result["errors"], "output": test_result["output"], "retry_count": self.get_retry_count(task_id), } # 4. Code review via Claude wt_repo = git.Repo(worktree_path) diff = wt_repo.git.diff("main", "--", ".") review_result = await self.code_review(diff, task=task) if not review_result["approved"]: self._increment_retry(task_id) return { "status": "review_failed", "issues": review_result["issues"], "summary": review_result["summary"], "retry_count": self.get_retry_count(task_id), } # 5. Merge to main merge_result = self.merge_to_main(worktree_path, task_id) if not merge_result["success"]: return { "status": "merge_failed", "error": merge_result.get("error", "Unknown merge error"), } return { "status": "merged", "commit_sha": merge_result["commit_sha"], "review_summary": review_result["summary"], } async def rebase_onto_main(self, worktree_path: str, task_id: str) -> dict: """Rebase the feature branch in the worktree onto main. Returns: dict with success bool and conflicts list. """ wt_repo = git.Repo(worktree_path) try: wt_repo.git.fetch("origin", "main") except git.GitCommandError: pass # fetch may fail in local-only repos; continue with local main try: wt_repo.git.rebase("main") return {"success": True, "conflicts": []} except git.GitCommandError: # Rebase failed — check for conflicts conflicts = self._get_conflict_files(wt_repo) if conflicts and self.auto_resolve_conflicts(worktree_path): return {"success": True, "conflicts": []} # Abort the failed rebase try: wt_repo.git.rebase("--abort") except git.GitCommandError: pass return {"success": False, "conflicts": conflicts} def run_linter(self, worktree_path: str) -> dict: """Run ruff linter on the worktree. Returns: dict with passed bool, errors list, and warnings list. """ try: result = subprocess.run( ["ruff", "check", "."], cwd=worktree_path, capture_output=True, text=True, timeout=120, ) except FileNotFoundError: return {"passed": True, "errors": [], "warnings": ["ruff not found, skipping lint"]} except subprocess.TimeoutExpired: return {"passed": False, "errors": ["Linter timed out"], "warnings": []} errors = [] warnings = [] for line in result.stdout.splitlines(): line = line.strip() if not line or line.startswith("Found") or line.startswith("All checks"): continue # ruff output lines contain error codes like E501, W291, etc. if re.search(r"\b[A-Z]\d{3,4}\b", line): errors.append(line) elif line: warnings.append(line) passed = result.returncode == 0 return {"passed": passed, "errors": errors, "warnings": warnings} def run_tests(self, worktree_path: str) -> dict: """Run pytest in the worktree. Returns: dict with passed bool, total/failures/errors counts, and raw output. """ try: result = subprocess.run( ["python", "-m", "pytest", "-v", "--tb=short"], cwd=worktree_path, capture_output=True, text=True, timeout=300, ) except FileNotFoundError: return {"passed": False, "total": 0, "failures": 0, "errors": 1, "output": "pytest not found"} except subprocess.TimeoutExpired: return {"passed": False, "total": 0, "failures": 0, "errors": 1, "output": "Test execution timed out"} output = result.stdout + result.stderr parsed = self.parse_test_results(output) parsed["output"] = output return parsed async def code_review(self, diff: str, task: dict = None) -> dict: """Review a diff using Claude for quality and security issues. Returns: dict with approved bool, issues list, and summary string. """ template = self._load_template("qa_review.txt") task_context = "" if task: task_context = ( f"Task ID: {task.get('id', 'N/A')}\n" f"Title: {task.get('title', 'N/A')}\n" f"Description: {task.get('description', 'N/A')}" ) prompt = template.format(task_context=task_context, diff=diff) response = await self.client.complete( prompt=prompt, model="claude-sonnet-4-6", max_turns=100, observability=self.observability, agent_name="qa_agent", task_id=str(task.get("id", task.get("task_id", "review"))) if task else "review", ) if self.observability: self.observability.log_token_usage( "qa_agent", str(task.get("id", task.get("task_id", "review"))) if task else "review", input_tokens=response.input_tokens, output_tokens=response.output_tokens, model="claude-sonnet-4-6", ) text = response.text return self._parse_review_response(text) def merge_to_main(self, worktree_path: str, task_id: str) -> dict: """Merge the feature branch into main with --no-ff. Returns: dict with success bool and commit_sha. """ branch_name = f"feature/task-{task_id}" try: self.repo.git.checkout("main") self.repo.git.merge("--no-ff", branch_name, m=f"Merge {branch_name}") commit_sha = self.repo.head.commit.hexsha return {"success": True, "commit_sha": commit_sha} except git.GitCommandError as e: return {"success": False, "commit_sha": None, "error": str(e)} def auto_resolve_conflicts(self, worktree_path: str) -> bool: """Try to auto-resolve simple merge conflicts. Returns True if all conflicts were resolved. """ wt_repo = git.Repo(worktree_path) unmerged = wt_repo.index.unmerged_blobs() if not unmerged: return True for path in unmerged: file_path = os.path.join(worktree_path, path) if not os.path.exists(file_path): continue try: with open(file_path) as f: content = f.read() # Accept "theirs" (incoming) for simple conflicts if "<<<<<<< " in content and "=======" in content and ">>>>>>> " in content: resolved = re.sub( r"<<<<<<< [^\n]*\n.*?=======\n(.*?)>>>>>>> [^\n]*\n", r"\1", content, flags=re.DOTALL, ) with open(file_path, "w") as f: f.write(resolved) wt_repo.index.add([path]) else: return False except Exception: return False try: wt_repo.git.rebase("--continue") return True except git.GitCommandError: return False def parse_test_results(self, output: str) -> dict: """Parse pytest output into structured results. Returns: dict with passed bool, total int, failures int, errors int. """ # Match pytest summary line like "5 passed, 2 failed, 1 error" passed_count = 0 failed_count = 0 error_count = 0 # Look for the summary line summary_match = re.search( r"=+\s*(.*?)\s*=+\s*$", output, re.MULTILINE, ) if summary_match: summary_line = summary_match.group(1) p = re.search(r"(\d+)\s+passed", summary_line) f = re.search(r"(\d+)\s+failed", summary_line) e = re.search(r"(\d+)\s+error", summary_line) if p: passed_count = int(p.group(1)) if f: failed_count = int(f.group(1)) if e: error_count = int(e.group(1)) total = passed_count + failed_count + error_count all_passed = failed_count == 0 and error_count == 0 and total > 0 return { "passed": all_passed, "total": total, "failures": failed_count, "errors": error_count, } def get_retry_count(self, task_id: str) -> int: """Return QA retry count for a task.""" return self._retry_counts.get(task_id, 0) def _increment_retry(self, task_id: str): """Increment the retry counter for a task.""" self._retry_counts[task_id] = self._retry_counts.get(task_id, 0) + 1 def _load_template(self, template_name: str) -> str: """Load a prompt template file from app_factory/prompts/.""" path = self._prompts_dir / template_name return path.read_text() def _get_conflict_files(self, repo: git.Repo) -> list[str]: """Get list of conflicting files from a repo.""" try: status_output = repo.git.status("--porcelain") conflicts = [] for line in status_output.splitlines(): if line.startswith("UU ") or line.startswith("AA "): conflicts.append(line[3:].strip()) return conflicts except git.GitCommandError: return [] def _parse_review_response(self, text: str) -> dict: """Parse Claude's review response into structured data.""" approved = False issues = [] summary = "" for line in text.splitlines(): line = line.strip() if line.upper().startswith("APPROVED:"): value = line.split(":", 1)[1].strip().lower() approved = value in ("true", "yes") elif line.startswith("- ["): # Parse issue lines like "- [severity: critical] description" issue_match = re.match( r"-\s*\[severity:\s*(critical|warning|info)\]\s*(.*)", line, re.IGNORECASE, ) if issue_match: issues.append({ "severity": issue_match.group(1).lower(), "description": issue_match.group(2).strip(), }) elif line.upper().startswith("SUMMARY:"): summary = line.split(":", 1)[1].strip() return {"approved": approved, "issues": issues, "summary": summary}