Files
ai_ops2/tests/test_observability.py
2026-02-25 23:49:54 -05:00

427 lines
16 KiB
Python

"""Tests for ObservabilityManager."""
import asyncio
import logging
import os
import sys
from unittest.mock import MagicMock, patch
import pytest
from app_factory.core.observability import ObservabilityManager, _StructuredFormatter
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def obs():
"""ObservabilityManager with LangSmith disabled (import fails)."""
# langsmith may not be installed; simulate import failure
with patch.dict(sys.modules, {"langsmith": None}):
manager = ObservabilityManager("test-project")
assert manager._client is None
return manager
@pytest.fixture
def obs_with_client():
"""ObservabilityManager with a mocked LangSmith client."""
mock_client = MagicMock()
mock_langsmith = MagicMock()
mock_langsmith.Client.return_value = mock_client
with patch.dict(sys.modules, {"langsmith": mock_langsmith}):
manager = ObservabilityManager("test-project")
return manager, mock_client
# ---------------------------------------------------------------------------
# Initialization
# ---------------------------------------------------------------------------
class TestInitialization:
def test_explicit_project_name(self, obs):
assert obs.project_name == "test-project"
def test_default_project_from_env(self):
with patch.dict(os.environ, {"LANGSMITH_PROJECT": "env-project"}):
with patch.dict(sys.modules, {"langsmith": None}):
manager = ObservabilityManager()
assert manager.project_name == "env-project"
def test_fallback_project_name(self):
env = os.environ.copy()
env.pop("LANGSMITH_PROJECT", None)
with patch.dict(os.environ, env, clear=True):
with patch.dict(sys.modules, {"langsmith": None}):
manager = ObservabilityManager()
assert manager.project_name == "app-factory"
def test_graceful_degradation_no_langsmith(self, obs):
assert obs._client is None
def test_logger_created(self, obs):
assert obs.logger is not None
assert obs.logger.level == logging.DEBUG
assert obs.logger.propagate is False
def test_claude_event_mode_from_env(self):
with patch.dict(os.environ, {"APP_FACTORY_CLAUDE_EVENT_MODE": "verbose"}):
with patch.dict(sys.modules, {"langsmith": None}):
manager = ObservabilityManager("test-project")
assert manager._claude_event_mode == "verbose"
# ---------------------------------------------------------------------------
# Tracing lifecycle
# ---------------------------------------------------------------------------
class TestTracing:
def test_start_trace_returns_run_id(self, obs):
run_id = obs.start_trace("pm_agent", "1.2")
assert isinstance(run_id, str)
assert len(run_id) == 32 # hex uuid
def test_start_and_end_trace(self, obs):
run_id = obs.start_trace("pm_agent", "1.2", inputs={"key": "val"})
obs.end_trace(run_id, outputs={"result": "ok"})
assert run_id not in obs._active_runs
def test_end_trace_with_error(self, obs):
run_id = obs.start_trace("pm_agent", "1.2")
obs.end_trace(run_id, error="something went wrong")
metrics = obs.get_metrics()
assert metrics["total_errors"] == 1
def test_langsmith_create_run_called(self, obs_with_client):
manager, mock_client = obs_with_client
manager.start_trace("agent", "1.1", inputs={"x": 1})
mock_client.create_run.assert_called_once()
def test_langsmith_update_run_called(self, obs_with_client):
manager, mock_client = obs_with_client
run_id = manager.start_trace("agent", "1.1")
manager.end_trace(run_id, outputs={"y": 2})
mock_client.update_run.assert_called_once()
def test_langsmith_failure_does_not_raise(self):
mock_client = MagicMock()
mock_client.create_run.side_effect = Exception("network error")
mock_langsmith = MagicMock()
mock_langsmith.Client.return_value = mock_client
with patch.dict(sys.modules, {"langsmith": mock_langsmith}):
manager = ObservabilityManager("test-project")
# Should not raise
run_id = manager.start_trace("agent", "1.1")
assert isinstance(run_id, str)
# ---------------------------------------------------------------------------
# Decorator
# ---------------------------------------------------------------------------
class TestDecorator:
def test_sync_decorator(self, obs):
@obs.trace_agent_execution("agent", "1.1")
def my_func(x):
return x * 2
result = my_func(5)
assert result == 10
assert obs.get_metrics()["total_traces"] == 1
def test_async_decorator(self, obs):
@obs.trace_agent_execution("agent", "1.1")
async def my_async_func(x):
return x + 1
result = asyncio.run(my_async_func(10))
assert result == 11
assert obs.get_metrics()["total_traces"] == 1
def test_decorator_records_error(self, obs):
@obs.trace_agent_execution("agent", "1.1")
def failing():
raise ValueError("boom")
with pytest.raises(ValueError, match="boom"):
failing()
assert obs.get_metrics()["total_errors"] == 1
def test_decorator_preserves_function_name(self, obs):
@obs.trace_agent_execution("agent", "1.1")
def my_named_func():
pass
assert my_named_func.__name__ == "my_named_func"
# ---------------------------------------------------------------------------
# Async context manager
# ---------------------------------------------------------------------------
class TestAsyncContextManager:
def test_trace_context_success(self, obs):
async def run():
async with obs.trace_context("agent", "1.1") as run_id:
assert isinstance(run_id, str)
return run_id
run_id = asyncio.run(run())
assert run_id not in obs._active_runs
assert obs.get_metrics()["total_traces"] == 1
def test_trace_context_exception(self, obs):
async def run():
with pytest.raises(RuntimeError):
async with obs.trace_context("agent", "1.1") as run_id:
raise RuntimeError("test error")
return run_id
asyncio.run(run())
assert obs.get_metrics()["total_errors"] == 1
# ---------------------------------------------------------------------------
# Async trace_agent helper
# ---------------------------------------------------------------------------
class TestTraceAgent:
def test_trace_agent_success(self, obs):
async def do_test():
async def my_coro():
return 42
return await obs.trace_agent("agent", "1.1", my_coro)
result = asyncio.run(do_test())
assert result == 42
assert obs.get_metrics()["total_traces"] == 1
# ---------------------------------------------------------------------------
# Logging format
# ---------------------------------------------------------------------------
class TestLoggingFormat:
def test_structured_formatter(self):
formatter = _StructuredFormatter(use_color=False)
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="hello world",
args=(),
exc_info=None,
)
record.agent_name = "PM_AGENT"
record.task_id = "task-1.2"
formatted = formatter.format(record)
assert "[PM_AGENT]" in formatted
assert "[task-1.2]" in formatted
assert "[INFO]" in formatted
assert "hello world" in formatted
# Check ISO timestamp pattern
assert formatted.startswith("[20")
assert "\x1b[" not in formatted
def test_formatter_defaults(self):
formatter = _StructuredFormatter(use_color=False)
record = logging.LogRecord(
name="test",
level=logging.WARNING,
pathname="",
lineno=0,
msg="warn msg",
args=(),
exc_info=None,
)
formatted = formatter.format(record)
assert "[SYSTEM]" in formatted
assert "[-]" in formatted
def test_formatter_can_render_colors(self):
formatter = _StructuredFormatter(use_color=True)
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="Trace started: run_id=abc123",
args=(),
exc_info=None,
)
record.agent_name = "PM_AGENT"
record.task_id = "task-1.2"
formatted = formatter.format(record)
assert "\x1b[" in formatted
# ---------------------------------------------------------------------------
# Logging methods
# ---------------------------------------------------------------------------
class TestLoggingMethods:
class _CaptureHandler(logging.Handler):
def __init__(self):
super().__init__()
self.messages: list[str] = []
def emit(self, record):
self.messages.append(record.getMessage())
def test_log_state_transition(self, obs):
capture = self._CaptureHandler()
obs.logger.addHandler(capture)
obs.log_state_transition("idle", "processing", {"reason": "new task"})
obs.logger.removeHandler(capture)
assert any("idle -> processing" in m for m in capture.messages)
def test_log_token_usage(self, obs):
obs.log_token_usage("pm_agent", "1.1", input_tokens=100, output_tokens=50, model="claude-3")
metrics = obs.get_metrics()
assert metrics["total_tokens"] == 150
assert metrics["per_agent"]["pm_agent"]["tokens"] == 150
def test_log_error(self, obs):
try:
raise ValueError("test error")
except ValueError as e:
obs.log_error("pm_agent", "1.1", e, context={"step": "parse"})
metrics = obs.get_metrics()
assert metrics["total_errors"] == 1
assert metrics["per_agent"]["pm_agent"]["errors"] == 1
def test_log_claude_event(self, obs):
obs.log_claude_event("pm_agent", "1.1", "tool_use", {"tool_name": "Bash"})
obs.log_claude_event("pm_agent", "1.1", "result_message", {"subtype": "success"})
metrics = obs.get_metrics()
assert metrics["total_claude_events"] == 2
assert metrics["total_tool_calls"] == 1
assert metrics["per_agent"]["pm_agent"]["claude_events"] == 2
assert metrics["per_agent"]["pm_agent"]["tool_calls"] == 1
def test_log_claude_event_readable_format(self, obs):
capture = self._CaptureHandler()
obs.logger.addHandler(capture)
obs.log_claude_event(
"pm_agent",
"1.1",
"tool_use",
{"tool_name": "Bash", "tool_use_id": "toolu_1", "tool_input": {"command": "pwd"}},
)
obs.logger.removeHandler(capture)
combined = "\n".join(capture.messages)
assert "Claude tool call: Bash" in combined
assert "command=pwd" in combined
def test_log_claude_event_noisy_tool_result_is_suppressed_on_success(self, obs):
capture = self._CaptureHandler()
obs.logger.addHandler(capture)
obs.log_claude_event(
"pm_agent",
"1.1",
"tool_use",
{"tool_name": "Read", "tool_use_id": "toolu_1", "tool_input": {"file_path": "README.md"}},
)
obs.log_claude_event(
"pm_agent",
"1.1",
"tool_result",
{"tool_use_id": "toolu_1", "is_error": False, "content": "x" * 1000},
)
obs.logger.removeHandler(capture)
combined = "\n".join(capture.messages)
assert "Claude tool call: Read path=README.md" in combined
assert "Claude tool result: Read" not in combined
def test_log_claude_event_noisy_tool_result_logs_errors_with_context(self, obs):
capture = self._CaptureHandler()
obs.logger.addHandler(capture)
obs.log_claude_event(
"pm_agent",
"1.1",
"tool_use",
{"tool_name": "Read", "tool_use_id": "toolu_1", "tool_input": {"file_path": "README.md"}},
)
obs.log_claude_event(
"pm_agent",
"1.1",
"tool_result",
{"tool_use_id": "toolu_1", "is_error": True, "content": "permission denied"},
)
obs.logger.removeHandler(capture)
combined = "\n".join(capture.messages)
assert "Claude tool result: Read status=error path=README.md error=permission denied" in combined
def test_log_claude_event_filters_noise_by_default(self, obs):
capture = self._CaptureHandler()
obs.logger.addHandler(capture)
obs.log_claude_event("pm_agent", "1.1", "stream_event", {"event": "delta"})
obs.logger.removeHandler(capture)
combined = "\n".join(capture.messages)
assert "stream_event" not in combined
def test_log_claude_event_verbose_mode_includes_noise(self):
with patch.dict(sys.modules, {"langsmith": None}):
manager = ObservabilityManager("test-project", claude_event_mode="verbose")
capture = self._CaptureHandler()
manager.logger.addHandler(capture)
manager.log_claude_event("pm_agent", "1.1", "stream_event", {"event": "delta"})
manager.logger.removeHandler(capture)
combined = "\n".join(capture.messages)
assert "Claude event: type=stream_event" in combined
# ---------------------------------------------------------------------------
# Metrics
# ---------------------------------------------------------------------------
class TestMetrics:
def test_initial_metrics(self, obs):
metrics = obs.get_metrics()
assert metrics["total_tokens"] == 0
assert metrics["total_traces"] == 0
assert metrics["total_errors"] == 0
assert metrics["total_claude_events"] == 0
assert metrics["total_tool_calls"] == 0
def test_metrics_accumulate(self, obs):
obs.start_trace("a1", "1.1")
obs.start_trace("a1", "1.2")
obs.start_trace("a2", "2.1")
obs.log_token_usage("a1", "1.1", 10, 20)
obs.log_token_usage("a2", "2.1", 5, 5)
metrics = obs.get_metrics()
assert metrics["total_traces"] == 3
assert metrics["total_tokens"] == 40
assert metrics["per_agent"]["a1"]["traces"] == 2
assert metrics["per_agent"]["a2"]["traces"] == 1
assert metrics["per_agent"]["a1"]["tokens"] == 30
assert metrics["per_agent"]["a2"]["tokens"] == 10
# ---------------------------------------------------------------------------
# Graceful degradation
# ---------------------------------------------------------------------------
class TestGracefulDegradation:
def test_all_operations_work_without_langsmith(self, obs):
"""Every public method should work fine with _client=None."""
assert obs._client is None
run_id = obs.start_trace("agent", "1.1", inputs={"a": 1})
obs.end_trace(run_id, outputs={"b": 2})
obs.end_trace("nonexistent-id", error="whatever")
obs.log_state_transition("a", "b")
obs.log_token_usage("agent", "1.1", 10, 20)
try:
raise RuntimeError("test")
except RuntimeError as e:
obs.log_error("agent", "1.1", e)
metrics = obs.get_metrics()
assert metrics["total_traces"] == 1