"""Observability Manager - LangSmith tracing, logging, and monitoring.""" import contextlib import functools import inspect import json import logging import os import time import traceback import uuid from collections import defaultdict from datetime import datetime, timezone from typing import Any, Callable, Optional from app_factory.core.logging_utils import ( FG_BLUE, FG_CYAN, FG_MAGENTA, LEVEL_COLORS, colorize, should_use_color, ) class _StructuredFormatter(logging.Formatter): """Custom formatter: [ISO_TIMESTAMP] [AGENT] [TASK] [LEVEL] message""" _EVENT_COLORS = { "State transition": FG_MAGENTA, "Token usage": FG_BLUE, "Claude event": FG_BLUE, "Trace started": FG_CYAN, "Trace ended": FG_CYAN, } def __init__(self, use_color: Optional[bool] = None): super().__init__() self._use_color = should_use_color(use_color=use_color) def _colorize_message(self, message: str) -> str: for prefix, style in self._EVENT_COLORS.items(): if message.startswith(prefix): return colorize(message, style, self._use_color) return message def format(self, record: logging.LogRecord) -> str: ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") agent = getattr(record, "agent_name", "SYSTEM") task = getattr(record, "task_id", "-") level = record.levelname message = record.getMessage() if self._use_color: ts = colorize(ts, FG_BLUE, enabled=True) agent = colorize(agent, FG_CYAN, enabled=True) task = colorize(task, FG_MAGENTA, enabled=True) level = colorize(level, LEVEL_COLORS.get(record.levelno, ""), enabled=True) message = self._colorize_message(message) return f"[{ts}] [{agent}] [{task}] [{level}] {message}" class _TraceContext: """Async context manager for trace_context().""" def __init__(self, manager: "ObservabilityManager", agent_name: str, task_id: str): self._manager = manager self._agent_name = agent_name self._task_id = task_id self._run_id: Optional[str] = None async def __aenter__(self) -> str: self._run_id = self._manager.start_trace(self._agent_name, self._task_id) return self._run_id async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool: if exc_val is not None: self._manager.end_trace( self._run_id, error=f"{exc_type.__name__}: {exc_val}" ) else: self._manager.end_trace(self._run_id) return False # do not suppress exceptions class ObservabilityManager: """Wraps LangSmith client for tracing and structured logging.""" _CLAUDE_EVENT_FILTERS = { "quiet": { "request_start", "request_error", "request_complete", "tool_use", "tool_result", }, "focused": { "request_start", "request_error", "request_complete", "tool_use", "tool_result", "thinking_block", "result_message", }, "verbose": None, # no filtering "off": set(), } def __init__(self, project_name: str = None, claude_event_mode: str | None = None): self.project_name = project_name or os.getenv("LANGSMITH_PROJECT", "app-factory") requested_mode = ( claude_event_mode or os.getenv("APP_FACTORY_CLAUDE_EVENT_MODE", "quiet") ) normalized_mode = requested_mode.strip().lower() if isinstance(requested_mode, str) else "focused" self._claude_event_mode = ( normalized_mode if normalized_mode in self._CLAUDE_EVENT_FILTERS else "focused" ) # --- LangSmith client (optional) --- self._client = None try: from langsmith import Client # noqa: F811 self._client = Client() except Exception as exc: # LangSmith not configured or unreachable -- degrade gracefully logging.getLogger(__name__).warning( "LangSmith unavailable (%s). Tracing disabled.", exc ) # --- Structured logger --- self.logger = logging.getLogger(f"app_factory.{self.project_name}") if not self.logger.handlers: handler = logging.StreamHandler() handler.setFormatter(_StructuredFormatter()) self.logger.addHandler(handler) self.logger.setLevel(logging.DEBUG) self.logger.propagate = False self._tool_name_by_use_id: dict[str, str] = {} self._tool_summary_by_use_id: dict[str, str] = {} # --- Internal metrics --- self._active_runs: dict[str, dict] = {} self._metrics = { "total_tokens": 0, "total_traces": 0, "total_errors": 0, "total_claude_events": 0, "total_tool_calls": 0, "per_agent": defaultdict(lambda: { "tokens": 0, "traces": 0, "errors": 0, "claude_events": 0, "tool_calls": 0, }), } # ------------------------------------------------------------------ # Tracing # ------------------------------------------------------------------ def start_trace(self, agent_name: str, task_id: str, inputs: dict = None) -> str: """Start a new trace run, return run_id.""" run_id = uuid.uuid4().hex self._metrics["total_traces"] += 1 self._metrics["per_agent"][agent_name]["traces"] += 1 self._active_runs[run_id] = { "agent_name": agent_name, "task_id": task_id, "start_time": time.time(), } self.logger.info( "Trace started: run_id=%s", run_id, extra={"agent_name": agent_name, "task_id": task_id}, ) try: if self._client is not None: self._client.create_run( name=f"{agent_name}:{task_id}", run_type="chain", inputs=inputs or {}, id=run_id, project_name=self.project_name, ) except Exception as exc: self.logger.warning( "LangSmith create_run failed: %s", exc, extra={"agent_name": agent_name, "task_id": task_id}, ) return run_id def end_trace(self, run_id: str, outputs: dict = None, error: str = None): """End a trace run with outputs or error.""" run_info = self._active_runs.pop(run_id, {}) agent_name = run_info.get("agent_name", "unknown") task_id = run_info.get("task_id", "-") if error: self._metrics["total_errors"] += 1 self._metrics["per_agent"][agent_name]["errors"] += 1 self.logger.error( "Trace error: run_id=%s error=%s", run_id, error, extra={"agent_name": agent_name, "task_id": task_id}, ) else: self.logger.info( "Trace ended: run_id=%s", run_id, extra={"agent_name": agent_name, "task_id": task_id}, ) try: if self._client is not None: update_kwargs: dict[str, Any] = {"end_time": datetime.now(timezone.utc)} if outputs: update_kwargs["outputs"] = outputs if error: update_kwargs["error"] = error self._client.update_run(run_id, **update_kwargs) except Exception as exc: self.logger.warning( "LangSmith update_run failed: %s", exc, extra={"agent_name": agent_name, "task_id": task_id}, ) # ------------------------------------------------------------------ # Decorator # ------------------------------------------------------------------ def trace_agent_execution(self, agent_name: str, task_id: str): """Decorator for tracking agent calls with context.""" def decorator(func: Callable): @functools.wraps(func) async def async_wrapper(*args, **kwargs): run_id = self.start_trace(agent_name, task_id, inputs={"args": str(args), "kwargs": str(kwargs)}) try: result = await func(*args, **kwargs) self.end_trace(run_id, outputs={"result": str(result)}) return result except Exception as exc: self.end_trace(run_id, error=f"{type(exc).__name__}: {exc}") raise @functools.wraps(func) def sync_wrapper(*args, **kwargs): run_id = self.start_trace(agent_name, task_id, inputs={"args": str(args), "kwargs": str(kwargs)}) try: result = func(*args, **kwargs) self.end_trace(run_id, outputs={"result": str(result)}) return result except Exception as exc: self.end_trace(run_id, error=f"{type(exc).__name__}: {exc}") raise if inspect.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator # ------------------------------------------------------------------ # Async helpers # ------------------------------------------------------------------ async def trace_agent(self, agent_name: str, task_id: str, func: Callable): """Async helper to run a function within a trace context.""" run_id = self.start_trace(agent_name, task_id) try: result = await func() self.end_trace(run_id, outputs={"result": str(result)}) return result except Exception as exc: self.end_trace(run_id, error=f"{type(exc).__name__}: {exc}") raise def trace_context(self, agent_name: str, task_id: str) -> _TraceContext: """Return an async context manager for tracing. Usage:: async with obs.trace_context("agent", "task_id") as run_id: ... """ return _TraceContext(self, agent_name, task_id) # ------------------------------------------------------------------ # Logging helpers # ------------------------------------------------------------------ def log_state_transition(self, from_state: str, to_state: str, metadata: dict = None): """Log a state machine transition.""" msg = f"State transition: {from_state} -> {to_state}" if metadata: msg += f" metadata={metadata}" self.logger.info(msg, extra={"agent_name": "STATE_MACHINE", "task_id": "-"}) def log_token_usage( self, agent_name: str, task_id: str, input_tokens: int, output_tokens: int, model: str = None, ): """Log token usage for cost monitoring.""" total = input_tokens + output_tokens self._metrics["total_tokens"] += total self._metrics["per_agent"][agent_name]["tokens"] += total msg = f"Token usage: input={input_tokens} output={output_tokens} total={total}" if model: msg += f" model={model}" self.logger.info(msg, extra={"agent_name": agent_name, "task_id": task_id}) def log_error(self, agent_name: str, task_id: str, error: Exception, context: dict = None): """Log an error with full stack trace.""" self._metrics["total_errors"] += 1 self._metrics["per_agent"][agent_name]["errors"] += 1 tb = traceback.format_exception(type(error), error, error.__traceback__) msg = f"Error: {error}\n{''.join(tb)}" if context: msg += f" context={context}" self.logger.error(msg, extra={"agent_name": agent_name, "task_id": task_id}) def log_claude_event( self, agent_name: str, task_id: str, event_type: str, payload: dict | None = None, ): """Log a Claude SDK/CLI event in structured form.""" self._metrics["total_claude_events"] += 1 self._metrics["per_agent"][agent_name]["claude_events"] += 1 normalized_event = (event_type or "unknown").strip().lower() normalized_payload = dict(payload or {}) if normalized_event == "tool_use": self._metrics["total_tool_calls"] += 1 self._metrics["per_agent"][agent_name]["tool_calls"] += 1 tool_use_id = normalized_payload.get("tool_use_id") tool_name = normalized_payload.get("tool_name") tool_input = normalized_payload.get("tool_input") if isinstance(tool_use_id, str) and isinstance(tool_name, str): self._tool_name_by_use_id[tool_use_id] = tool_name self._tool_summary_by_use_id[tool_use_id] = self._summarize_tool_input( str(tool_name), tool_input, ) if normalized_event == "tool_result": tool_use_id = normalized_payload.get("tool_use_id") if isinstance(tool_use_id, str): tool_name = self._tool_name_by_use_id.pop(tool_use_id, None) tool_summary = self._tool_summary_by_use_id.pop(tool_use_id, None) if "tool_name" not in normalized_payload and tool_name: normalized_payload["tool_name"] = tool_name if "tool_input_summary" not in normalized_payload and tool_summary: normalized_payload["tool_input_summary"] = tool_summary if not self._should_log_claude_event(normalized_event): return msg = self._format_claude_event_message(normalized_event, normalized_payload) if not msg: return self.logger.debug(msg, extra={"agent_name": agent_name, "task_id": task_id}) def _should_log_claude_event(self, event_type: str) -> bool: allowed = self._CLAUDE_EVENT_FILTERS.get(self._claude_event_mode) if allowed is None: return True return event_type in allowed def _format_claude_event_message(self, event_type: str, payload: dict[str, Any]) -> str: session_id = payload.get("session_id") session_suffix = f" session={session_id}" if session_id else "" if event_type == "request_start": model = payload.get("model") or "default" prompt_chars = payload.get("prompt_chars", 0) return f"Claude request started: model={model} prompt_chars={prompt_chars}{session_suffix}" if event_type == "request_complete": inp = payload.get("input_tokens", 0) out = payload.get("output_tokens", 0) subtype = payload.get("result_subtype") or "unknown" preview = self._shorten_text(payload.get("result_preview"), max_chars=140) preview_fragment = f' result="{preview}"' if preview else "" return ( f"Claude request completed: subtype={subtype} " f"tokens={inp}->{out}{preview_fragment}{session_suffix}" ) if event_type == "request_error": err = self._shorten_text(payload.get("error")) retrying = payload.get("retrying") retry_fragment = " retrying=true" if retrying else "" return f"Claude request error: {err}{retry_fragment}{session_suffix}" if event_type == "tool_use": tool_name = payload.get("tool_name", "unknown_tool") tool_input = payload.get("tool_input") input_summary = self._summarize_tool_input(str(tool_name), tool_input) return f"Claude tool call: {tool_name} {input_summary}{session_suffix}" if event_type == "tool_result": tool_name = payload.get("tool_name", "tool") is_error = bool(payload.get("is_error", False)) content = payload.get("content") input_summary = payload.get("tool_input_summary") input_fragment = f" {input_summary}" if input_summary else "" status = "error" if is_error else "ok" if self._is_noisy_tool_name(str(tool_name)) and not is_error: return "" if self._is_noisy_tool_name(str(tool_name)) and is_error: error_preview = self._shorten_text(content, max_chars=420) error_fragment = f" error={error_preview}" if error_preview else "" return ( f"Claude tool result: {tool_name} status={status}" f"{input_fragment}{error_fragment}{session_suffix}" ) content_preview = self._compact_json(content, max_chars=420) return ( f"Claude tool result: {tool_name} status={status}" f"{input_fragment} content={content_preview}{session_suffix}" ) if event_type == "text_block": preview = self._shorten_text(payload.get("preview")) return f"Claude says: {preview}{session_suffix}" if event_type == "thinking_block": chars = payload.get("chars", 0) return f"Claude thinking block: chars={chars}{session_suffix}" if event_type == "result_message": subtype = payload.get("subtype", "unknown") turns = payload.get("num_turns", 0) duration_ms = payload.get("duration_ms") duration_fragment = f" duration_ms={duration_ms}" if duration_ms is not None else "" return f"Claude result message: subtype={subtype} turns={turns}{duration_fragment}{session_suffix}" payload_json = self._compact_json(payload) return f"Claude event: type={event_type} payload={payload_json}{session_suffix}" @staticmethod def _shorten_text(value: Any, max_chars: int = 220) -> str: text = str(value) if value is not None else "" text = text.strip().replace("\n", " ") if len(text) <= max_chars: return text return f"{text[:max_chars]}..." @staticmethod def _compact_json(value: Any, max_chars: int = 300) -> str: with contextlib.suppress(TypeError, ValueError): rendered = json.dumps(value, sort_keys=True, default=str) if len(rendered) <= max_chars: return rendered return f"{rendered[:max_chars]}..." return ObservabilityManager._shorten_text(value, max_chars=max_chars) @staticmethod def _is_noisy_tool_name(tool_name: str) -> bool: return tool_name.lower() in {"read", "bash", "grep", "glob", "find", "ls"} @classmethod def _summarize_tool_input(cls, tool_name: str, tool_input: Any) -> str: if not isinstance(tool_input, dict): return f"input={cls._compact_json(tool_input, max_chars=140)}" normalized_name = tool_name.lower() if normalized_name == "read": path = tool_input.get("file_path") or tool_input.get("path") return f"path={cls._shorten_path(path, max_chars=120)}" if normalized_name == "bash": cmd = tool_input.get("command") compact_cmd = cls._abbreviate_workspace_paths(cmd) return f"command={cls._shorten_text(compact_cmd, max_chars=160)}" description = tool_input.get("description") if isinstance(description, str) and description.strip(): return f"description={cls._shorten_text(description, max_chars=140)}" summary_keys = ("file_path", "path", "pattern", "query", "command", "name") summary: dict[str, Any] = {} for key in summary_keys: if key in tool_input: value = tool_input[key] if key in {"file_path", "path"}: value = cls._shorten_path(value, max_chars=120) summary[key] = value if summary: return f"input={cls._compact_json(summary, max_chars=160)}" return f"input={cls._compact_json(tool_input, max_chars=160)}" @classmethod def _shorten_path(cls, value: Any, max_chars: int = 120) -> str: text = str(value).strip() if value is not None else "" if not text: return "" normalized = text with contextlib.suppress(Exception): cwd = os.path.abspath(os.getcwd()) if os.path.isabs(text): abs_path = os.path.abspath(text) if abs_path == cwd: normalized = "." elif abs_path.startswith(f"{cwd}{os.sep}"): normalized = os.path.relpath(abs_path, cwd) else: normalized = text.replace(f"{cwd}{os.sep}", "") return cls._shorten_text(normalized, max_chars=max_chars) @staticmethod def _abbreviate_workspace_paths(value: Any) -> str: text = str(value).strip() if value is not None else "" if not text: return "" compact = text with contextlib.suppress(Exception): cwd = os.path.abspath(os.getcwd()) compact = compact.replace(f"{cwd}{os.sep}", "") compact = compact.replace(cwd, ".") return compact @classmethod def _estimate_chars(cls, value: Any) -> int: if value is None: return 0 if isinstance(value, str): return len(value) with contextlib.suppress(TypeError, ValueError): return len(json.dumps(value, default=str)) return len(str(value)) # ------------------------------------------------------------------ # Metrics # ------------------------------------------------------------------ def get_metrics(self) -> dict: """Return accumulated metrics (total tokens, traces, errors).""" return { "total_tokens": self._metrics["total_tokens"], "total_traces": self._metrics["total_traces"], "total_errors": self._metrics["total_errors"], "total_claude_events": self._metrics["total_claude_events"], "total_tool_calls": self._metrics["total_tool_calls"], "per_agent": dict(self._metrics["per_agent"]), }