"""Tests for TaskMasterAgent.""" import asyncio import json import subprocess from unittest.mock import AsyncMock, MagicMock, patch import pytest from app_factory.agents.task_agent import TaskMasterAgent @pytest.fixture def agent(tmp_path): return TaskMasterAgent(project_root=str(tmp_path)) def _cli_result(data, returncode=0, stderr=""): """Build a mock subprocess.CompletedProcess returning JSON data.""" result = MagicMock(spec=subprocess.CompletedProcess) result.returncode = returncode result.stdout = json.dumps(data) if isinstance(data, dict) else data result.stderr = stderr return result # --- parse_prd --- @pytest.mark.asyncio async def test_parse_prd_writes_file_and_calls_cli(agent, tmp_path): prd_text = "# My PRD\nBuild a thing." cli_output = {"tasks": [{"id": 1, "title": "Task 1"}], "count": 1} with patch("subprocess.run", return_value=_cli_result(cli_output)) as mock_run: result = await agent.parse_prd(prd_text, num_tasks=5) prd_path = tmp_path / ".taskmaster" / "docs" / "prd.md" assert prd_path.exists() assert prd_path.read_text() == prd_text mock_run.assert_called_once() call_args = mock_run.call_args[0][0] assert call_args[0] == "task-master" assert "parse-prd" in call_args assert "--num-tasks" in call_args assert "5" in call_args assert result == cli_output # --- get_unblocked_tasks --- @pytest.mark.asyncio async def test_get_unblocked_tasks_filters_correctly(agent): tasks_data = { "tasks": [ {"id": 1, "title": "Done task", "status": "done", "dependencies": []}, {"id": 2, "title": "Pending, no deps", "status": "pending", "dependencies": []}, {"id": 3, "title": "Pending, dep done", "status": "pending", "dependencies": [1]}, {"id": 4, "title": "Pending, dep not done", "status": "pending", "dependencies": [5]}, {"id": 5, "title": "In-progress", "status": "in-progress", "dependencies": []}, ] } with patch("subprocess.run", return_value=_cli_result(tasks_data)): unblocked = await agent.get_unblocked_tasks() ids = [t["id"] for t in unblocked] assert 2 in ids assert 3 in ids assert 4 not in ids # dependency 5 is not done assert 1 not in ids # status is done, not pending assert 5 not in ids # status is in-progress @pytest.mark.asyncio async def test_get_unblocked_tasks_empty(agent): with patch("subprocess.run", return_value=_cli_result({"tasks": []})): unblocked = await agent.get_unblocked_tasks() assert unblocked == [] # --- update_task_status --- @pytest.mark.asyncio async def test_update_task_status_without_notes(agent): with patch("subprocess.run", return_value=_cli_result({})) as mock_run: await agent.update_task_status("3", "done") assert mock_run.call_count == 1 call_args = mock_run.call_args[0][0] assert "--id=3" in call_args assert "--status=done" in call_args @pytest.mark.asyncio async def test_update_task_status_with_notes(agent): with patch("subprocess.run", return_value=_cli_result({})) as mock_run: await agent.update_task_status("3", "in-progress", notes="Started work") assert mock_run.call_count == 2 first_call = mock_run.call_args_list[0][0][0] second_call = mock_run.call_args_list[1][0][0] assert "set-status" in first_call assert "update-subtask" in second_call assert "--prompt=Started work" in second_call # --- get_task_details --- @pytest.mark.asyncio async def test_get_task_details_returns_correct_structure(agent): task_data = { "task": { "id": 2, "title": "Auth system", "description": "Implement JWT auth", "details": "Use bcrypt for hashing", "testStrategy": "Unit tests for auth", "dependencies": [1], "subtasks": [{"id": "2.1", "title": "Setup JWT"}], "status": "pending", "priority": "high", } } with patch("subprocess.run", return_value=_cli_result(task_data)): result = await agent.get_task_details("2") assert result["id"] == 2 assert result["title"] == "Auth system" assert result["description"] == "Implement JWT auth" assert result["details"] == "Use bcrypt for hashing" assert result["testStrategy"] == "Unit tests for auth" assert result["dependencies"] == [1] assert len(result["subtasks"]) == 1 assert result["status"] == "pending" assert result["priority"] == "high" # --- get_next_task --- @pytest.mark.asyncio async def test_get_next_task_uses_cli(agent): task_data = {"task": {"id": 3, "title": "Next task", "status": "pending"}} with patch("subprocess.run", return_value=_cli_result(task_data)): result = await agent.get_next_task() assert result["id"] == 3 @pytest.mark.asyncio async def test_get_next_task_fallback_when_cli_fails(agent): tasks_data = { "tasks": [ {"id": 1, "title": "Done", "status": "done", "dependencies": []}, {"id": 2, "title": "Low", "status": "pending", "dependencies": [], "priority": "low"}, {"id": 3, "title": "High", "status": "pending", "dependencies": [], "priority": "high"}, ] } fail_result = _cli_result("", returncode=1, stderr="error") call_count = 0 def side_effect(*args, **kwargs): nonlocal call_count call_count += 1 cmd = args[0] if "next" in cmd: return fail_result return _cli_result(tasks_data) with patch("subprocess.run", side_effect=side_effect): result = await agent.get_next_task() assert result["id"] == 3 # high priority comes first @pytest.mark.asyncio async def test_get_next_task_returns_none_when_all_done(agent): tasks_data = { "tasks": [ {"id": 1, "title": "Done", "status": "done", "dependencies": []}, ] } fail_result = _cli_result("", returncode=1, stderr="error") def side_effect(*args, **kwargs): cmd = args[0] if "next" in cmd: return fail_result return _cli_result(tasks_data) with patch("subprocess.run", side_effect=side_effect): result = await agent.get_next_task() assert result is None # --- expand_task --- @pytest.mark.asyncio async def test_expand_task(agent): expand_data = {"subtasks": [{"id": "1.1"}, {"id": "1.2"}]} with patch("subprocess.run", return_value=_cli_result(expand_data)) as mock_run: result = await agent.expand_task("1", num_subtasks=2) call_args = mock_run.call_args[0][0] assert "--id=1" in call_args assert "--num=2" in call_args assert "--force" in call_args assert result == expand_data # --- retry logic --- @pytest.mark.asyncio async def test_retry_succeeds_after_failures(agent): agent.base_delay = 0.01 # speed up test fail = _cli_result("", returncode=1, stderr="transient error") success = _cli_result({"tasks": []}) with patch("subprocess.run", side_effect=[fail, fail, success]): result = await agent.get_unblocked_tasks() assert result == [] @pytest.mark.asyncio async def test_retry_exhausted_raises(agent): agent.base_delay = 0.01 fail = _cli_result("", returncode=1, stderr="persistent error") with patch("subprocess.run", return_value=fail): with pytest.raises(RuntimeError, match="All 3 attempts failed"): await agent.get_unblocked_tasks() @pytest.mark.asyncio async def test_retry_exponential_backoff(agent): agent.base_delay = 0.01 fail = _cli_result("", returncode=1, stderr="error") success = _cli_result({"tasks": []}) delays = [] original_sleep = asyncio.sleep async def mock_sleep(duration): delays.append(duration) # don't actually sleep with patch("subprocess.run", side_effect=[fail, fail, success]): with patch("asyncio.sleep", side_effect=mock_sleep): await agent.get_unblocked_tasks() assert len(delays) == 2 assert delays[0] == pytest.approx(0.01) # base_delay * 2^0 assert delays[1] == pytest.approx(0.02) # base_delay * 2^1