270 lines
10 KiB
Python
270 lines
10 KiB
Python
"""Tests for WorkspaceManager."""
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import docker as docker_mod
|
|
import git
|
|
import pytest
|
|
|
|
from app_factory.core.workspace import (
|
|
DockerProvisionError,
|
|
GitWorktreeError,
|
|
WorkspaceError,
|
|
WorkspaceManager,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_git_repo(tmp_path):
|
|
"""Create a real temporary git repository."""
|
|
repo_dir = tmp_path / "repo"
|
|
repo_dir.mkdir()
|
|
repo = git.Repo.init(repo_dir)
|
|
readme = repo_dir / "README.md"
|
|
readme.write_text("# Test Repo")
|
|
repo.index.add(["README.md"])
|
|
repo.index.commit("Initial commit")
|
|
if repo.active_branch.name != "main":
|
|
repo.git.branch("-M", "main")
|
|
return str(repo_dir)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_docker_client():
|
|
"""Create a mock Docker client."""
|
|
mock_client = MagicMock()
|
|
mock_client.images.pull.return_value = MagicMock()
|
|
mock_container = MagicMock()
|
|
mock_container.id = "abc123container"
|
|
mock_client.containers.create.return_value = mock_container
|
|
return mock_client
|
|
|
|
|
|
@pytest.fixture
|
|
def workspace_manager(temp_git_repo, mock_docker_client):
|
|
"""Create a WorkspaceManager with a real git repo and mocked Docker."""
|
|
with patch("app_factory.core.workspace.docker.from_env", return_value=mock_docker_client):
|
|
wm = WorkspaceManager(temp_git_repo)
|
|
return wm
|
|
|
|
|
|
class TestWorkspaceManagerInit:
|
|
def test_init_valid_repo(self, temp_git_repo, mock_docker_client):
|
|
with patch("app_factory.core.workspace.docker.from_env", return_value=mock_docker_client):
|
|
wm = WorkspaceManager(temp_git_repo)
|
|
assert wm.repo_path == Path(temp_git_repo).resolve()
|
|
assert wm.docker_image == "python:3.11-slim"
|
|
assert wm.active_workspaces == {}
|
|
|
|
def test_init_custom_image(self, temp_git_repo, mock_docker_client):
|
|
with patch("app_factory.core.workspace.docker.from_env", return_value=mock_docker_client):
|
|
wm = WorkspaceManager(temp_git_repo, docker_image="node:20-slim")
|
|
assert wm.docker_image == "node:20-slim"
|
|
|
|
def test_init_invalid_repo(self, tmp_path, mock_docker_client):
|
|
with patch("app_factory.core.workspace.docker.from_env", return_value=mock_docker_client):
|
|
with pytest.raises(GitWorktreeError, match="Invalid git repository"):
|
|
WorkspaceManager(str(tmp_path))
|
|
|
|
def test_init_nonexistent_path(self, mock_docker_client):
|
|
with patch("app_factory.core.workspace.docker.from_env", return_value=mock_docker_client):
|
|
with pytest.raises(GitWorktreeError, match="Repository path not found"):
|
|
WorkspaceManager("/nonexistent/path/to/repo")
|
|
|
|
def test_init_docker_unavailable(self, temp_git_repo):
|
|
with patch(
|
|
"app_factory.core.workspace.docker.from_env",
|
|
side_effect=docker_mod.errors.DockerException("Connection refused"),
|
|
):
|
|
with pytest.raises(DockerProvisionError, match="Failed to connect to Docker"):
|
|
WorkspaceManager(temp_git_repo)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestCreateWorktree:
|
|
async def test_create_worktree_success(self, workspace_manager):
|
|
wm = workspace_manager
|
|
path = await wm.create_worktree("task-001")
|
|
assert os.path.isdir(path)
|
|
assert "task-001" in path
|
|
assert "feature/task-task-001" in [b.name for b in wm.repo.branches]
|
|
# Cleanup
|
|
wm.repo.git.worktree("remove", path, "--force")
|
|
|
|
async def test_create_worktree_invalid_base_branch(self, workspace_manager):
|
|
wm = workspace_manager
|
|
with pytest.raises(GitWorktreeError, match="does not exist"):
|
|
await wm.create_worktree("task-002", base_branch="nonexistent")
|
|
|
|
async def test_create_worktree_branch_already_exists(self, workspace_manager):
|
|
wm = workspace_manager
|
|
wm.repo.git.branch("feature/task-task-003")
|
|
with pytest.raises(GitWorktreeError, match="Branch already exists"):
|
|
await wm.create_worktree("task-003")
|
|
|
|
async def test_create_worktree_path_already_exists(self, workspace_manager):
|
|
wm = workspace_manager
|
|
worktree_dir = wm.repo_path.parent / "worktrees" / "task-004"
|
|
worktree_dir.mkdir(parents=True)
|
|
with pytest.raises(GitWorktreeError, match="Worktree path already exists"):
|
|
await wm.create_worktree("task-004")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestSpinUpCleanRoom:
|
|
async def test_spin_up_clean_room_success(self, workspace_manager, mock_docker_client):
|
|
wm = workspace_manager
|
|
container = await wm.spin_up_clean_room("/tmp/fake_worktree", "task-010")
|
|
|
|
assert container.id == "abc123container"
|
|
mock_docker_client.images.pull.assert_called_once_with("python:3.11-slim")
|
|
mock_docker_client.containers.create.assert_called_once_with(
|
|
image="python:3.11-slim",
|
|
name="appfactory-task-task-010",
|
|
volumes={"/tmp/fake_worktree": {"bind": "/workspace", "mode": "rw"}},
|
|
working_dir="/workspace",
|
|
network_mode="none",
|
|
auto_remove=False,
|
|
detach=True,
|
|
command="sleep infinity",
|
|
)
|
|
assert "task-010" in wm.active_workspaces
|
|
info = wm.active_workspaces["task-010"]
|
|
assert info["worktree_path"] == "/tmp/fake_worktree"
|
|
assert info["container_id"] == "abc123container"
|
|
|
|
async def test_spin_up_image_pull_failure(self, workspace_manager, mock_docker_client):
|
|
wm = workspace_manager
|
|
mock_docker_client.images.pull.side_effect = docker_mod.errors.APIError("pull failed")
|
|
with pytest.raises(DockerProvisionError, match="Failed to pull image"):
|
|
await wm.spin_up_clean_room("/tmp/fake", "task-011")
|
|
|
|
async def test_spin_up_container_create_failure(self, workspace_manager, mock_docker_client):
|
|
wm = workspace_manager
|
|
mock_docker_client.containers.create.side_effect = docker_mod.errors.APIError("create failed")
|
|
with pytest.raises(DockerProvisionError, match="Failed to create container"):
|
|
await wm.spin_up_clean_room("/tmp/fake", "task-012")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestCleanupWorkspace:
|
|
async def test_cleanup_workspace_success(self, workspace_manager):
|
|
wm = workspace_manager
|
|
path = await wm.create_worktree("task-020")
|
|
mock_container = MagicMock()
|
|
wm.active_workspaces["task-020"] = {
|
|
"task_id": "task-020",
|
|
"worktree_path": path,
|
|
"container_id": "cont123",
|
|
"container": mock_container,
|
|
}
|
|
|
|
await wm.cleanup_workspace("task-020")
|
|
|
|
mock_container.stop.assert_called_once_with(timeout=5)
|
|
mock_container.remove.assert_called_once_with(force=True)
|
|
assert "task-020" not in wm.active_workspaces
|
|
assert not os.path.exists(path)
|
|
|
|
async def test_cleanup_already_stopped_container(self, workspace_manager):
|
|
wm = workspace_manager
|
|
mock_container = MagicMock()
|
|
mock_container.stop.side_effect = Exception("already stopped")
|
|
mock_container.remove.return_value = None
|
|
|
|
wm.active_workspaces["task-021"] = {
|
|
"task_id": "task-021",
|
|
"worktree_path": "/tmp/nonexistent",
|
|
"container_id": "cont456",
|
|
"container": mock_container,
|
|
}
|
|
|
|
# Should not raise even if stop fails
|
|
await wm.cleanup_workspace("task-021")
|
|
assert "task-021" not in wm.active_workspaces
|
|
|
|
async def test_cleanup_no_container(self, workspace_manager):
|
|
wm = workspace_manager
|
|
path = await wm.create_worktree("task-022")
|
|
wm.active_workspaces["task-022"] = {
|
|
"task_id": "task-022",
|
|
"worktree_path": path,
|
|
"container_id": None,
|
|
"container": None,
|
|
}
|
|
await wm.cleanup_workspace("task-022")
|
|
assert "task-022" not in wm.active_workspaces
|
|
|
|
|
|
class TestGetActiveWorkspaces:
|
|
def test_get_empty(self, workspace_manager):
|
|
assert workspace_manager.get_active_workspaces() == []
|
|
|
|
def test_get_with_workspaces(self, workspace_manager):
|
|
wm = workspace_manager
|
|
wm.active_workspaces["t1"] = {
|
|
"task_id": "t1",
|
|
"worktree_path": "/path/t1",
|
|
"container_id": "c1",
|
|
"container": MagicMock(),
|
|
}
|
|
wm.active_workspaces["t2"] = {
|
|
"task_id": "t2",
|
|
"worktree_path": "/path/t2",
|
|
"container_id": "c2",
|
|
"container": MagicMock(),
|
|
}
|
|
|
|
result = wm.get_active_workspaces()
|
|
assert len(result) == 2
|
|
assert {"task_id": "t1", "worktree_path": "/path/t1", "container_id": "c1"} in result
|
|
assert {"task_id": "t2", "worktree_path": "/path/t2", "container_id": "c2"} in result
|
|
for item in result:
|
|
assert "container" not in item
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestCleanupAll:
|
|
async def test_cleanup_all_success(self, workspace_manager):
|
|
wm = workspace_manager
|
|
mock_c1 = MagicMock()
|
|
mock_c2 = MagicMock()
|
|
wm.active_workspaces["t1"] = {
|
|
"task_id": "t1",
|
|
"worktree_path": "/tmp/nonexistent1",
|
|
"container_id": "c1",
|
|
"container": mock_c1,
|
|
}
|
|
wm.active_workspaces["t2"] = {
|
|
"task_id": "t2",
|
|
"worktree_path": "/tmp/nonexistent2",
|
|
"container_id": "c2",
|
|
"container": mock_c2,
|
|
}
|
|
|
|
await wm.cleanup_all()
|
|
|
|
mock_c1.stop.assert_called_once()
|
|
mock_c2.stop.assert_called_once()
|
|
assert wm.active_workspaces == {}
|
|
|
|
async def test_cleanup_all_with_errors(self, workspace_manager):
|
|
wm = workspace_manager
|
|
mock_c1 = MagicMock()
|
|
mock_c1.stop.side_effect = Exception("stop failed")
|
|
mock_c1.remove.side_effect = Exception("remove failed")
|
|
|
|
wm.active_workspaces["t1"] = {
|
|
"task_id": "t1",
|
|
"worktree_path": "/tmp/nonexistent",
|
|
"container_id": "c1",
|
|
"container": mock_c1,
|
|
}
|
|
|
|
with pytest.raises(WorkspaceError, match="Cleanup all completed with errors"):
|
|
await wm.cleanup_all()
|
|
|
|
assert wm.active_workspaces == {}
|