137 lines
4.8 KiB
Python
137 lines
4.8 KiB
Python
"""Project Manager Agent - Expands user prompts into structured PRDs and handles clarification requests."""
|
|
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from app_factory.core.claude_client import ClaudeSDKClient
|
|
|
|
|
|
class PMAgent:
|
|
"""Agent responsible for PRD generation, clarification handling, and project planning."""
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: str = None,
|
|
auth_token: str = None,
|
|
model: str = "claude-opus-4-6",
|
|
debug: bool = False,
|
|
observability=None,
|
|
):
|
|
self.model = model
|
|
self.input_tokens = 0
|
|
self.output_tokens = 0
|
|
self._prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
|
|
self.observability = observability
|
|
|
|
resolved_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
|
|
resolved_auth = auth_token or os.environ.get("ANTHROPIC_AUTH_TOKEN")
|
|
self.client = ClaudeSDKClient(
|
|
api_key=resolved_key,
|
|
auth_token=resolved_auth,
|
|
enable_debug=debug,
|
|
)
|
|
|
|
def _load_template(self, template_name: str) -> str:
|
|
"""Load a prompt template file from app_factory/prompts/."""
|
|
path = self._prompts_dir / template_name
|
|
return path.read_text()
|
|
|
|
async def expand_prompt_to_prd(self, user_input: str) -> str:
|
|
"""Expand a user prompt into a structured PRD using Claude.
|
|
|
|
Returns markdown with sections: Objective, Core Requirements,
|
|
Technical Architecture, Tech Stack, Success Criteria, Non-Functional Requirements.
|
|
"""
|
|
system_prompt = self._load_template("pm_prd_expansion.txt")
|
|
|
|
response = await self.client.complete(
|
|
prompt=user_input,
|
|
model=self.model,
|
|
system_prompt=system_prompt,
|
|
max_turns=100,
|
|
observability=self.observability,
|
|
agent_name="pm_agent",
|
|
task_id="expand_prd",
|
|
)
|
|
|
|
self.input_tokens += response.input_tokens
|
|
self.output_tokens += response.output_tokens
|
|
if self.observability:
|
|
self.observability.log_token_usage(
|
|
"pm_agent",
|
|
"expand_prd",
|
|
input_tokens=response.input_tokens,
|
|
output_tokens=response.output_tokens,
|
|
model=self.model,
|
|
)
|
|
|
|
return response.text
|
|
|
|
async def handle_clarification_request(self, clarification: dict) -> str:
|
|
"""Handle a clarification request from a downstream agent.
|
|
|
|
Args:
|
|
clarification: dict with keys requesting_agent, task_id, question, context.
|
|
|
|
Returns:
|
|
Clarification response string. If the question requires human input,
|
|
prompts the user and returns their answer.
|
|
"""
|
|
template = self._load_template("pm_clarification.txt")
|
|
prompt = template.format(
|
|
requesting_agent=clarification.get("requesting_agent", "unknown"),
|
|
task_id=clarification.get("task_id", "N/A"),
|
|
question=clarification.get("question", ""),
|
|
context=clarification.get("context", ""),
|
|
)
|
|
|
|
response = await self.client.complete(
|
|
prompt=prompt,
|
|
model=self.model,
|
|
max_turns=100,
|
|
observability=self.observability,
|
|
agent_name="pm_agent",
|
|
task_id=f"clarification:{clarification.get('task_id', 'N/A')}",
|
|
)
|
|
|
|
self.input_tokens += response.input_tokens
|
|
self.output_tokens += response.output_tokens
|
|
if self.observability:
|
|
self.observability.log_token_usage(
|
|
"pm_agent",
|
|
f"clarification:{clarification.get('task_id', 'N/A')}",
|
|
input_tokens=response.input_tokens,
|
|
output_tokens=response.output_tokens,
|
|
model=self.model,
|
|
)
|
|
|
|
answer = response.text.strip()
|
|
|
|
if "ESCALATE_TO_HUMAN" in answer:
|
|
human_answer = input(
|
|
f"[PMAgent] Clarification needed for {clarification.get('requesting_agent', 'agent')} "
|
|
f"(task {clarification.get('task_id', 'N/A')}): "
|
|
f"{clarification.get('question', '')}\n> "
|
|
)
|
|
return human_answer
|
|
|
|
return answer
|
|
|
|
def update_prd(self, prd_path: str, updates: str):
|
|
"""Append updates to an existing PRD file with a versioned header."""
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
header = f"\n\n---\n## PRD Update - {timestamp}\n\n"
|
|
|
|
with open(prd_path, "a") as f:
|
|
f.write(header)
|
|
f.write(updates)
|
|
|
|
def get_token_usage(self) -> dict:
|
|
"""Return cumulative token usage."""
|
|
return {
|
|
"input_tokens": self.input_tokens,
|
|
"output_tokens": self.output_tokens,
|
|
"total_tokens": self.input_tokens + self.output_tokens,
|
|
}
|