Files
ai_ops2/main.py
2026-02-25 23:49:54 -05:00

362 lines
11 KiB
Python

"""App Factory - Autonomous multi-agent orchestration framework.
Usage:
python main.py --prompt "Build a video transcription service" --repo-path /path/to/project
python main.py --prompt "Build a REST API" --max-concurrent-tasks 3 --debug
python main.py --dry-run --prompt "Test project"
"""
import argparse
import asyncio
import logging
import os
import shutil
import signal
import subprocess
import sys
from datetime import datetime, timezone
from dotenv import load_dotenv
from app_factory.core.logging_utils import LevelColorFormatter
# ---------------------------------------------------------------------------
# Custom exceptions
# ---------------------------------------------------------------------------
class AppFactoryError(Exception):
"""Base exception for App Factory."""
class ClarificationTimeout(AppFactoryError):
"""A task needs human input but timed out waiting."""
class DockerDaemonError(AppFactoryError):
"""Docker daemon is not reachable."""
class GitError(AppFactoryError):
"""Git operation failed."""
class MCPConnectionError(AppFactoryError):
"""MCP server connection failed."""
class ConfigurationError(AppFactoryError):
"""Missing or invalid configuration."""
logger = logging.getLogger("app_factory")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args(argv=None):
"""Parse CLI arguments."""
parser = argparse.ArgumentParser(
description="App Factory - Autonomous multi-agent orchestration framework.",
)
parser.add_argument(
"--prompt",
required=True,
help="Project description for the App Factory to build.",
)
parser.add_argument(
"--repo-path",
default=os.getcwd(),
help="Target repository path (default: current directory).",
)
parser.add_argument(
"--max-concurrent-tasks",
type=int,
default=5,
help="Maximum parallel dev agents (default: 5).",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable verbose debug logging.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Validate configuration without executing.",
)
return parser.parse_args(argv)
def validate_environment():
"""Check required env vars, Docker daemon, and git availability.
Returns:
dict of validated config values.
Raises:
DockerDaemonError: If Docker is unreachable.
GitError: If git is not available.
"""
# API key (optional — Claude Code OAuth is also supported)
api_key = "".join(os.environ.get("ANTHROPIC_API_KEY", "").split())
auth_token = "".join(os.environ.get("ANTHROPIC_AUTH_TOKEN", "").split())
if api_key:
os.environ["ANTHROPIC_API_KEY"] = api_key
if auth_token:
os.environ["ANTHROPIC_AUTH_TOKEN"] = auth_token
if not api_key and not auth_token:
logger.warning(
"No ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN set; relying on Claude Code OAuth session."
)
# Docker
try:
result = subprocess.run(
["docker", "info"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
raise DockerDaemonError(
"Docker daemon not running. Start Docker Desktop and retry."
)
except FileNotFoundError:
raise DockerDaemonError(
"Docker not found on PATH. Install Docker and retry."
)
except subprocess.TimeoutExpired:
raise DockerDaemonError(
"Docker daemon not responding. Start Docker Desktop and retry."
)
# Git
if not shutil.which("git"):
raise GitError("git not found on PATH. Install git and retry.")
return {
"api_key": api_key,
"auth_token": auth_token,
"langsmith_api_key": os.environ.get("LANGSMITH_API_KEY", ""),
"langsmith_project": os.environ.get("LANGSMITH_PROJECT", "app-factory"),
}
# ---------------------------------------------------------------------------
# Orchestrator execution
# ---------------------------------------------------------------------------
async def run_factory(args, config: dict) -> dict:
"""Main execution flow: initialize all components and run the orchestrator.
Returns:
Final state dict from the orchestrator.
"""
from app_factory.agents.dev_agent import DevAgentManager
from app_factory.agents.pm_agent import PMAgent
from app_factory.agents.qa_agent import QAAgent
from app_factory.agents.task_agent import TaskMasterAgent
from app_factory.core.architecture_tracker import ArchitectureTracker
from app_factory.core.graph import AppFactoryOrchestrator
from app_factory.core.observability import ObservabilityManager
from app_factory.core.workspace import WorkspaceManager
api_key = config.get("api_key") or None
auth_token = config.get("auth_token") or None
raw_debug_flag = getattr(args, "debug", False)
debug_enabled = raw_debug_flag if isinstance(raw_debug_flag, bool) else False
# 1. Observability
observability = ObservabilityManager(
project_name=config.get("langsmith_project", "app-factory"),
)
# 2. Workspace
workspace_manager = WorkspaceManager(repo_path=args.repo_path)
# 3. Architecture tracker
arch_tracker = ArchitectureTracker(
api_key=api_key,
auth_token=auth_token,
debug=debug_enabled,
observability=observability,
)
# 4. Agents
pm_agent = PMAgent(
api_key=api_key,
auth_token=auth_token,
debug=debug_enabled,
observability=observability,
)
task_agent = TaskMasterAgent(project_root=args.repo_path)
dev_manager = DevAgentManager(
docker_client=workspace_manager.docker_client,
max_retries=3,
)
qa_agent = QAAgent(
repo_path=args.repo_path,
api_key=api_key,
auth_token=auth_token,
debug=debug_enabled,
observability=observability,
)
# 5. Orchestrator
orchestrator = AppFactoryOrchestrator(
pm_agent=pm_agent,
task_agent=task_agent,
dev_manager=dev_manager,
qa_agent=qa_agent,
workspace_manager=workspace_manager,
observability=observability,
)
# 6. Set up graceful shutdown
GracefulShutdown(workspace_manager=workspace_manager)
# 7. Execute
result = await orchestrator.run(args.prompt)
return result
# ---------------------------------------------------------------------------
# Summary
# ---------------------------------------------------------------------------
def print_summary(result: dict, start_time: datetime):
"""Print final execution summary."""
elapsed = datetime.now(timezone.utc) - start_time
minutes, seconds = divmod(int(elapsed.total_seconds()), 60)
completed = result.get("completed_tasks", [])
tasks = result.get("tasks", [])
errors = result.get("errors", [])
print("\n" + "=" * 60)
print("APP FACTORY - Execution Summary")
print("=" * 60)
print(f" Tasks completed : {len(completed)} / {len(tasks)}")
print(f" Execution time : {minutes}m {seconds}s")
print(f" Iterations : {result.get('iteration_count', 0)}")
if errors:
print(f" Errors : {len(errors)}")
for err in errors[:5]:
print(f" - {err[:120]}")
if len(errors) > 5:
print(f" ... and {len(errors) - 5} more")
langsmith_project = os.environ.get("LANGSMITH_PROJECT", "")
if langsmith_project:
print(f" LangSmith : project={langsmith_project}")
print("=" * 60)
# ---------------------------------------------------------------------------
# Graceful shutdown
# ---------------------------------------------------------------------------
class GracefulShutdown:
"""Signal handler for SIGINT/SIGTERM with two-stage shutdown."""
def __init__(self, workspace_manager=None):
self.shutdown_requested = False
self.workspace_manager = workspace_manager
signal.signal(signal.SIGINT, self._handler)
signal.signal(signal.SIGTERM, self._handler)
def _handler(self, signum, frame):
if self.shutdown_requested:
print("\nForce exit.")
sys.exit(1)
self.shutdown_requested = True
print("\nShutting down gracefully... (press Ctrl+C again to force)")
if self.workspace_manager:
try:
loop = asyncio.get_running_loop()
loop.create_task(self.workspace_manager.cleanup_all())
except RuntimeError:
# No running loop - run synchronously is not possible for async cleanup
pass
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
"""Entry point: load env, parse args, validate, run factory."""
load_dotenv()
args = parse_args()
# Logging
level = logging.DEBUG if args.debug else logging.INFO
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(
LevelColorFormatter(
fmt="[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
stream=stream_handler.stream,
)
)
logging.basicConfig(
level=level,
handlers=[stream_handler],
force=True,
)
# Validate or dry-run
try:
config = validate_environment()
except ConfigurationError as e:
print(f"Configuration error: {e}", file=sys.stderr)
sys.exit(1)
except DockerDaemonError as e:
print(f"Docker error: {e}", file=sys.stderr)
sys.exit(1)
except GitError as e:
print(f"Git error: {e}", file=sys.stderr)
sys.exit(1)
if args.dry_run:
print("Dry-run: configuration is valid. All checks passed.")
return
# Run
start_time = datetime.now(timezone.utc)
try:
result = asyncio.run(run_factory(args, config))
print_summary(result, start_time)
except ClarificationTimeout as e:
print(
f"Clarification timeout: {e}. "
"Run with --interactive for manual resolution.",
file=sys.stderr,
)
sys.exit(1)
except KeyboardInterrupt:
print("\nInterrupted.", file=sys.stderr)
sys.exit(130)
except Exception as e:
logger.exception("Fatal error")
print(f"Fatal error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()