Files
ai_ops2/app_factory/core/observability.py
2026-02-25 23:49:54 -05:00

573 lines
22 KiB
Python

"""Observability Manager - LangSmith tracing, logging, and monitoring."""
import contextlib
import functools
import inspect
import json
import logging
import os
import time
import traceback
import uuid
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, Callable, Optional
from app_factory.core.logging_utils import (
FG_BLUE,
FG_CYAN,
FG_MAGENTA,
LEVEL_COLORS,
colorize,
should_use_color,
)
class _StructuredFormatter(logging.Formatter):
"""Custom formatter: [ISO_TIMESTAMP] [AGENT] [TASK] [LEVEL] message"""
_EVENT_COLORS = {
"State transition": FG_MAGENTA,
"Token usage": FG_BLUE,
"Claude event": FG_BLUE,
"Trace started": FG_CYAN,
"Trace ended": FG_CYAN,
}
def __init__(self, use_color: Optional[bool] = None):
super().__init__()
self._use_color = should_use_color(use_color=use_color)
def _colorize_message(self, message: str) -> str:
for prefix, style in self._EVENT_COLORS.items():
if message.startswith(prefix):
return colorize(message, style, self._use_color)
return message
def format(self, record: logging.LogRecord) -> str:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
agent = getattr(record, "agent_name", "SYSTEM")
task = getattr(record, "task_id", "-")
level = record.levelname
message = record.getMessage()
if self._use_color:
ts = colorize(ts, FG_BLUE, enabled=True)
agent = colorize(agent, FG_CYAN, enabled=True)
task = colorize(task, FG_MAGENTA, enabled=True)
level = colorize(level, LEVEL_COLORS.get(record.levelno, ""), enabled=True)
message = self._colorize_message(message)
return f"[{ts}] [{agent}] [{task}] [{level}] {message}"
class _TraceContext:
"""Async context manager for trace_context()."""
def __init__(self, manager: "ObservabilityManager", agent_name: str, task_id: str):
self._manager = manager
self._agent_name = agent_name
self._task_id = task_id
self._run_id: Optional[str] = None
async def __aenter__(self) -> str:
self._run_id = self._manager.start_trace(self._agent_name, self._task_id)
return self._run_id
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
if exc_val is not None:
self._manager.end_trace(
self._run_id, error=f"{exc_type.__name__}: {exc_val}"
)
else:
self._manager.end_trace(self._run_id)
return False # do not suppress exceptions
class ObservabilityManager:
"""Wraps LangSmith client for tracing and structured logging."""
_CLAUDE_EVENT_FILTERS = {
"quiet": {
"request_start",
"request_error",
"request_complete",
"tool_use",
"tool_result",
},
"focused": {
"request_start",
"request_error",
"request_complete",
"tool_use",
"tool_result",
"thinking_block",
"result_message",
},
"verbose": None, # no filtering
"off": set(),
}
def __init__(self, project_name: str = None, claude_event_mode: str | None = None):
self.project_name = project_name or os.getenv("LANGSMITH_PROJECT", "app-factory")
requested_mode = (
claude_event_mode
or os.getenv("APP_FACTORY_CLAUDE_EVENT_MODE", "quiet")
)
normalized_mode = requested_mode.strip().lower() if isinstance(requested_mode, str) else "focused"
self._claude_event_mode = (
normalized_mode if normalized_mode in self._CLAUDE_EVENT_FILTERS else "focused"
)
# --- LangSmith client (optional) ---
self._client = None
try:
from langsmith import Client # noqa: F811
self._client = Client()
except Exception as exc:
# LangSmith not configured or unreachable -- degrade gracefully
logging.getLogger(__name__).warning(
"LangSmith unavailable (%s). Tracing disabled.", exc
)
# --- Structured logger ---
self.logger = logging.getLogger(f"app_factory.{self.project_name}")
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(_StructuredFormatter())
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG)
self.logger.propagate = False
self._tool_name_by_use_id: dict[str, str] = {}
self._tool_summary_by_use_id: dict[str, str] = {}
# --- Internal metrics ---
self._active_runs: dict[str, dict] = {}
self._metrics = {
"total_tokens": 0,
"total_traces": 0,
"total_errors": 0,
"total_claude_events": 0,
"total_tool_calls": 0,
"per_agent": defaultdict(lambda: {
"tokens": 0,
"traces": 0,
"errors": 0,
"claude_events": 0,
"tool_calls": 0,
}),
}
# ------------------------------------------------------------------
# Tracing
# ------------------------------------------------------------------
def start_trace(self, agent_name: str, task_id: str, inputs: dict = None) -> str:
"""Start a new trace run, return run_id."""
run_id = uuid.uuid4().hex
self._metrics["total_traces"] += 1
self._metrics["per_agent"][agent_name]["traces"] += 1
self._active_runs[run_id] = {
"agent_name": agent_name,
"task_id": task_id,
"start_time": time.time(),
}
self.logger.info(
"Trace started: run_id=%s",
run_id,
extra={"agent_name": agent_name, "task_id": task_id},
)
try:
if self._client is not None:
self._client.create_run(
name=f"{agent_name}:{task_id}",
run_type="chain",
inputs=inputs or {},
id=run_id,
project_name=self.project_name,
)
except Exception as exc:
self.logger.warning(
"LangSmith create_run failed: %s",
exc,
extra={"agent_name": agent_name, "task_id": task_id},
)
return run_id
def end_trace(self, run_id: str, outputs: dict = None, error: str = None):
"""End a trace run with outputs or error."""
run_info = self._active_runs.pop(run_id, {})
agent_name = run_info.get("agent_name", "unknown")
task_id = run_info.get("task_id", "-")
if error:
self._metrics["total_errors"] += 1
self._metrics["per_agent"][agent_name]["errors"] += 1
self.logger.error(
"Trace error: run_id=%s error=%s",
run_id,
error,
extra={"agent_name": agent_name, "task_id": task_id},
)
else:
self.logger.info(
"Trace ended: run_id=%s",
run_id,
extra={"agent_name": agent_name, "task_id": task_id},
)
try:
if self._client is not None:
update_kwargs: dict[str, Any] = {"end_time": datetime.now(timezone.utc)}
if outputs:
update_kwargs["outputs"] = outputs
if error:
update_kwargs["error"] = error
self._client.update_run(run_id, **update_kwargs)
except Exception as exc:
self.logger.warning(
"LangSmith update_run failed: %s",
exc,
extra={"agent_name": agent_name, "task_id": task_id},
)
# ------------------------------------------------------------------
# Decorator
# ------------------------------------------------------------------
def trace_agent_execution(self, agent_name: str, task_id: str):
"""Decorator for tracking agent calls with context."""
def decorator(func: Callable):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
run_id = self.start_trace(agent_name, task_id, inputs={"args": str(args), "kwargs": str(kwargs)})
try:
result = await func(*args, **kwargs)
self.end_trace(run_id, outputs={"result": str(result)})
return result
except Exception as exc:
self.end_trace(run_id, error=f"{type(exc).__name__}: {exc}")
raise
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
run_id = self.start_trace(agent_name, task_id, inputs={"args": str(args), "kwargs": str(kwargs)})
try:
result = func(*args, **kwargs)
self.end_trace(run_id, outputs={"result": str(result)})
return result
except Exception as exc:
self.end_trace(run_id, error=f"{type(exc).__name__}: {exc}")
raise
if inspect.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
# ------------------------------------------------------------------
# Async helpers
# ------------------------------------------------------------------
async def trace_agent(self, agent_name: str, task_id: str, func: Callable):
"""Async helper to run a function within a trace context."""
run_id = self.start_trace(agent_name, task_id)
try:
result = await func()
self.end_trace(run_id, outputs={"result": str(result)})
return result
except Exception as exc:
self.end_trace(run_id, error=f"{type(exc).__name__}: {exc}")
raise
def trace_context(self, agent_name: str, task_id: str) -> _TraceContext:
"""Return an async context manager for tracing.
Usage::
async with obs.trace_context("agent", "task_id") as run_id:
...
"""
return _TraceContext(self, agent_name, task_id)
# ------------------------------------------------------------------
# Logging helpers
# ------------------------------------------------------------------
def log_state_transition(self, from_state: str, to_state: str, metadata: dict = None):
"""Log a state machine transition."""
msg = f"State transition: {from_state} -> {to_state}"
if metadata:
msg += f" metadata={metadata}"
self.logger.info(msg, extra={"agent_name": "STATE_MACHINE", "task_id": "-"})
def log_token_usage(
self,
agent_name: str,
task_id: str,
input_tokens: int,
output_tokens: int,
model: str = None,
):
"""Log token usage for cost monitoring."""
total = input_tokens + output_tokens
self._metrics["total_tokens"] += total
self._metrics["per_agent"][agent_name]["tokens"] += total
msg = f"Token usage: input={input_tokens} output={output_tokens} total={total}"
if model:
msg += f" model={model}"
self.logger.info(msg, extra={"agent_name": agent_name, "task_id": task_id})
def log_error(self, agent_name: str, task_id: str, error: Exception, context: dict = None):
"""Log an error with full stack trace."""
self._metrics["total_errors"] += 1
self._metrics["per_agent"][agent_name]["errors"] += 1
tb = traceback.format_exception(type(error), error, error.__traceback__)
msg = f"Error: {error}\n{''.join(tb)}"
if context:
msg += f" context={context}"
self.logger.error(msg, extra={"agent_name": agent_name, "task_id": task_id})
def log_claude_event(
self,
agent_name: str,
task_id: str,
event_type: str,
payload: dict | None = None,
):
"""Log a Claude SDK/CLI event in structured form."""
self._metrics["total_claude_events"] += 1
self._metrics["per_agent"][agent_name]["claude_events"] += 1
normalized_event = (event_type or "unknown").strip().lower()
normalized_payload = dict(payload or {})
if normalized_event == "tool_use":
self._metrics["total_tool_calls"] += 1
self._metrics["per_agent"][agent_name]["tool_calls"] += 1
tool_use_id = normalized_payload.get("tool_use_id")
tool_name = normalized_payload.get("tool_name")
tool_input = normalized_payload.get("tool_input")
if isinstance(tool_use_id, str) and isinstance(tool_name, str):
self._tool_name_by_use_id[tool_use_id] = tool_name
self._tool_summary_by_use_id[tool_use_id] = self._summarize_tool_input(
str(tool_name),
tool_input,
)
if normalized_event == "tool_result":
tool_use_id = normalized_payload.get("tool_use_id")
if isinstance(tool_use_id, str):
tool_name = self._tool_name_by_use_id.pop(tool_use_id, None)
tool_summary = self._tool_summary_by_use_id.pop(tool_use_id, None)
if "tool_name" not in normalized_payload and tool_name:
normalized_payload["tool_name"] = tool_name
if "tool_input_summary" not in normalized_payload and tool_summary:
normalized_payload["tool_input_summary"] = tool_summary
if not self._should_log_claude_event(normalized_event):
return
msg = self._format_claude_event_message(normalized_event, normalized_payload)
if not msg:
return
self.logger.debug(msg, extra={"agent_name": agent_name, "task_id": task_id})
def _should_log_claude_event(self, event_type: str) -> bool:
allowed = self._CLAUDE_EVENT_FILTERS.get(self._claude_event_mode)
if allowed is None:
return True
return event_type in allowed
def _format_claude_event_message(self, event_type: str, payload: dict[str, Any]) -> str:
session_id = payload.get("session_id")
session_suffix = f" session={session_id}" if session_id else ""
if event_type == "request_start":
model = payload.get("model") or "default"
prompt_chars = payload.get("prompt_chars", 0)
return f"Claude request started: model={model} prompt_chars={prompt_chars}{session_suffix}"
if event_type == "request_complete":
inp = payload.get("input_tokens", 0)
out = payload.get("output_tokens", 0)
subtype = payload.get("result_subtype") or "unknown"
preview = self._shorten_text(payload.get("result_preview"), max_chars=140)
preview_fragment = f' result="{preview}"' if preview else ""
return (
f"Claude request completed: subtype={subtype} "
f"tokens={inp}->{out}{preview_fragment}{session_suffix}"
)
if event_type == "request_error":
err = self._shorten_text(payload.get("error"))
retrying = payload.get("retrying")
retry_fragment = " retrying=true" if retrying else ""
return f"Claude request error: {err}{retry_fragment}{session_suffix}"
if event_type == "tool_use":
tool_name = payload.get("tool_name", "unknown_tool")
tool_input = payload.get("tool_input")
input_summary = self._summarize_tool_input(str(tool_name), tool_input)
return f"Claude tool call: {tool_name} {input_summary}{session_suffix}"
if event_type == "tool_result":
tool_name = payload.get("tool_name", "tool")
is_error = bool(payload.get("is_error", False))
content = payload.get("content")
input_summary = payload.get("tool_input_summary")
input_fragment = f" {input_summary}" if input_summary else ""
status = "error" if is_error else "ok"
if self._is_noisy_tool_name(str(tool_name)) and not is_error:
return ""
if self._is_noisy_tool_name(str(tool_name)) and is_error:
error_preview = self._shorten_text(content, max_chars=420)
error_fragment = f" error={error_preview}" if error_preview else ""
return (
f"Claude tool result: {tool_name} status={status}"
f"{input_fragment}{error_fragment}{session_suffix}"
)
content_preview = self._compact_json(content, max_chars=420)
return (
f"Claude tool result: {tool_name} status={status}"
f"{input_fragment} content={content_preview}{session_suffix}"
)
if event_type == "text_block":
preview = self._shorten_text(payload.get("preview"))
return f"Claude says: {preview}{session_suffix}"
if event_type == "thinking_block":
chars = payload.get("chars", 0)
return f"Claude thinking block: chars={chars}{session_suffix}"
if event_type == "result_message":
subtype = payload.get("subtype", "unknown")
turns = payload.get("num_turns", 0)
duration_ms = payload.get("duration_ms")
duration_fragment = f" duration_ms={duration_ms}" if duration_ms is not None else ""
return f"Claude result message: subtype={subtype} turns={turns}{duration_fragment}{session_suffix}"
payload_json = self._compact_json(payload)
return f"Claude event: type={event_type} payload={payload_json}{session_suffix}"
@staticmethod
def _shorten_text(value: Any, max_chars: int = 220) -> str:
text = str(value) if value is not None else ""
text = text.strip().replace("\n", " ")
if len(text) <= max_chars:
return text
return f"{text[:max_chars]}..."
@staticmethod
def _compact_json(value: Any, max_chars: int = 300) -> str:
with contextlib.suppress(TypeError, ValueError):
rendered = json.dumps(value, sort_keys=True, default=str)
if len(rendered) <= max_chars:
return rendered
return f"{rendered[:max_chars]}..."
return ObservabilityManager._shorten_text(value, max_chars=max_chars)
@staticmethod
def _is_noisy_tool_name(tool_name: str) -> bool:
return tool_name.lower() in {"read", "bash", "grep", "glob", "find", "ls"}
@classmethod
def _summarize_tool_input(cls, tool_name: str, tool_input: Any) -> str:
if not isinstance(tool_input, dict):
return f"input={cls._compact_json(tool_input, max_chars=140)}"
normalized_name = tool_name.lower()
if normalized_name == "read":
path = tool_input.get("file_path") or tool_input.get("path")
return f"path={cls._shorten_path(path, max_chars=120)}"
if normalized_name == "bash":
cmd = tool_input.get("command")
compact_cmd = cls._abbreviate_workspace_paths(cmd)
return f"command={cls._shorten_text(compact_cmd, max_chars=160)}"
description = tool_input.get("description")
if isinstance(description, str) and description.strip():
return f"description={cls._shorten_text(description, max_chars=140)}"
summary_keys = ("file_path", "path", "pattern", "query", "command", "name")
summary: dict[str, Any] = {}
for key in summary_keys:
if key in tool_input:
value = tool_input[key]
if key in {"file_path", "path"}:
value = cls._shorten_path(value, max_chars=120)
summary[key] = value
if summary:
return f"input={cls._compact_json(summary, max_chars=160)}"
return f"input={cls._compact_json(tool_input, max_chars=160)}"
@classmethod
def _shorten_path(cls, value: Any, max_chars: int = 120) -> str:
text = str(value).strip() if value is not None else ""
if not text:
return ""
normalized = text
with contextlib.suppress(Exception):
cwd = os.path.abspath(os.getcwd())
if os.path.isabs(text):
abs_path = os.path.abspath(text)
if abs_path == cwd:
normalized = "."
elif abs_path.startswith(f"{cwd}{os.sep}"):
normalized = os.path.relpath(abs_path, cwd)
else:
normalized = text.replace(f"{cwd}{os.sep}", "")
return cls._shorten_text(normalized, max_chars=max_chars)
@staticmethod
def _abbreviate_workspace_paths(value: Any) -> str:
text = str(value).strip() if value is not None else ""
if not text:
return ""
compact = text
with contextlib.suppress(Exception):
cwd = os.path.abspath(os.getcwd())
compact = compact.replace(f"{cwd}{os.sep}", "")
compact = compact.replace(cwd, ".")
return compact
@classmethod
def _estimate_chars(cls, value: Any) -> int:
if value is None:
return 0
if isinstance(value, str):
return len(value)
with contextlib.suppress(TypeError, ValueError):
return len(json.dumps(value, default=str))
return len(str(value))
# ------------------------------------------------------------------
# Metrics
# ------------------------------------------------------------------
def get_metrics(self) -> dict:
"""Return accumulated metrics (total tokens, traces, errors)."""
return {
"total_tokens": self._metrics["total_tokens"],
"total_traces": self._metrics["total_traces"],
"total_errors": self._metrics["total_errors"],
"total_claude_events": self._metrics["total_claude_events"],
"total_tool_calls": self._metrics["total_tool_calls"],
"per_agent": dict(self._metrics["per_agent"]),
}