"""Tests for ObservabilityManager.""" import asyncio import logging import os import sys from unittest.mock import MagicMock, patch import pytest from app_factory.core.observability import ObservabilityManager, _StructuredFormatter # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def obs(): """ObservabilityManager with LangSmith disabled (import fails).""" # langsmith may not be installed; simulate import failure with patch.dict(sys.modules, {"langsmith": None}): manager = ObservabilityManager("test-project") assert manager._client is None return manager @pytest.fixture def obs_with_client(): """ObservabilityManager with a mocked LangSmith client.""" mock_client = MagicMock() mock_langsmith = MagicMock() mock_langsmith.Client.return_value = mock_client with patch.dict(sys.modules, {"langsmith": mock_langsmith}): manager = ObservabilityManager("test-project") return manager, mock_client # --------------------------------------------------------------------------- # Initialization # --------------------------------------------------------------------------- class TestInitialization: def test_explicit_project_name(self, obs): assert obs.project_name == "test-project" def test_default_project_from_env(self): with patch.dict(os.environ, {"LANGSMITH_PROJECT": "env-project"}): with patch.dict(sys.modules, {"langsmith": None}): manager = ObservabilityManager() assert manager.project_name == "env-project" def test_fallback_project_name(self): env = os.environ.copy() env.pop("LANGSMITH_PROJECT", None) with patch.dict(os.environ, env, clear=True): with patch.dict(sys.modules, {"langsmith": None}): manager = ObservabilityManager() assert manager.project_name == "app-factory" def test_graceful_degradation_no_langsmith(self, obs): assert obs._client is None def test_logger_created(self, obs): assert obs.logger is not None assert obs.logger.level == logging.DEBUG assert obs.logger.propagate is False def test_claude_event_mode_from_env(self): with patch.dict(os.environ, {"APP_FACTORY_CLAUDE_EVENT_MODE": "verbose"}): with patch.dict(sys.modules, {"langsmith": None}): manager = ObservabilityManager("test-project") assert manager._claude_event_mode == "verbose" # --------------------------------------------------------------------------- # Tracing lifecycle # --------------------------------------------------------------------------- class TestTracing: def test_start_trace_returns_run_id(self, obs): run_id = obs.start_trace("pm_agent", "1.2") assert isinstance(run_id, str) assert len(run_id) == 32 # hex uuid def test_start_and_end_trace(self, obs): run_id = obs.start_trace("pm_agent", "1.2", inputs={"key": "val"}) obs.end_trace(run_id, outputs={"result": "ok"}) assert run_id not in obs._active_runs def test_end_trace_with_error(self, obs): run_id = obs.start_trace("pm_agent", "1.2") obs.end_trace(run_id, error="something went wrong") metrics = obs.get_metrics() assert metrics["total_errors"] == 1 def test_langsmith_create_run_called(self, obs_with_client): manager, mock_client = obs_with_client manager.start_trace("agent", "1.1", inputs={"x": 1}) mock_client.create_run.assert_called_once() def test_langsmith_update_run_called(self, obs_with_client): manager, mock_client = obs_with_client run_id = manager.start_trace("agent", "1.1") manager.end_trace(run_id, outputs={"y": 2}) mock_client.update_run.assert_called_once() def test_langsmith_failure_does_not_raise(self): mock_client = MagicMock() mock_client.create_run.side_effect = Exception("network error") mock_langsmith = MagicMock() mock_langsmith.Client.return_value = mock_client with patch.dict(sys.modules, {"langsmith": mock_langsmith}): manager = ObservabilityManager("test-project") # Should not raise run_id = manager.start_trace("agent", "1.1") assert isinstance(run_id, str) # --------------------------------------------------------------------------- # Decorator # --------------------------------------------------------------------------- class TestDecorator: def test_sync_decorator(self, obs): @obs.trace_agent_execution("agent", "1.1") def my_func(x): return x * 2 result = my_func(5) assert result == 10 assert obs.get_metrics()["total_traces"] == 1 def test_async_decorator(self, obs): @obs.trace_agent_execution("agent", "1.1") async def my_async_func(x): return x + 1 result = asyncio.run(my_async_func(10)) assert result == 11 assert obs.get_metrics()["total_traces"] == 1 def test_decorator_records_error(self, obs): @obs.trace_agent_execution("agent", "1.1") def failing(): raise ValueError("boom") with pytest.raises(ValueError, match="boom"): failing() assert obs.get_metrics()["total_errors"] == 1 def test_decorator_preserves_function_name(self, obs): @obs.trace_agent_execution("agent", "1.1") def my_named_func(): pass assert my_named_func.__name__ == "my_named_func" # --------------------------------------------------------------------------- # Async context manager # --------------------------------------------------------------------------- class TestAsyncContextManager: def test_trace_context_success(self, obs): async def run(): async with obs.trace_context("agent", "1.1") as run_id: assert isinstance(run_id, str) return run_id run_id = asyncio.run(run()) assert run_id not in obs._active_runs assert obs.get_metrics()["total_traces"] == 1 def test_trace_context_exception(self, obs): async def run(): with pytest.raises(RuntimeError): async with obs.trace_context("agent", "1.1") as run_id: raise RuntimeError("test error") return run_id asyncio.run(run()) assert obs.get_metrics()["total_errors"] == 1 # --------------------------------------------------------------------------- # Async trace_agent helper # --------------------------------------------------------------------------- class TestTraceAgent: def test_trace_agent_success(self, obs): async def do_test(): async def my_coro(): return 42 return await obs.trace_agent("agent", "1.1", my_coro) result = asyncio.run(do_test()) assert result == 42 assert obs.get_metrics()["total_traces"] == 1 # --------------------------------------------------------------------------- # Logging format # --------------------------------------------------------------------------- class TestLoggingFormat: def test_structured_formatter(self): formatter = _StructuredFormatter(use_color=False) record = logging.LogRecord( name="test", level=logging.INFO, pathname="", lineno=0, msg="hello world", args=(), exc_info=None, ) record.agent_name = "PM_AGENT" record.task_id = "task-1.2" formatted = formatter.format(record) assert "[PM_AGENT]" in formatted assert "[task-1.2]" in formatted assert "[INFO]" in formatted assert "hello world" in formatted # Check ISO timestamp pattern assert formatted.startswith("[20") assert "\x1b[" not in formatted def test_formatter_defaults(self): formatter = _StructuredFormatter(use_color=False) record = logging.LogRecord( name="test", level=logging.WARNING, pathname="", lineno=0, msg="warn msg", args=(), exc_info=None, ) formatted = formatter.format(record) assert "[SYSTEM]" in formatted assert "[-]" in formatted def test_formatter_can_render_colors(self): formatter = _StructuredFormatter(use_color=True) record = logging.LogRecord( name="test", level=logging.INFO, pathname="", lineno=0, msg="Trace started: run_id=abc123", args=(), exc_info=None, ) record.agent_name = "PM_AGENT" record.task_id = "task-1.2" formatted = formatter.format(record) assert "\x1b[" in formatted # --------------------------------------------------------------------------- # Logging methods # --------------------------------------------------------------------------- class TestLoggingMethods: class _CaptureHandler(logging.Handler): def __init__(self): super().__init__() self.messages: list[str] = [] def emit(self, record): self.messages.append(record.getMessage()) def test_log_state_transition(self, obs): capture = self._CaptureHandler() obs.logger.addHandler(capture) obs.log_state_transition("idle", "processing", {"reason": "new task"}) obs.logger.removeHandler(capture) assert any("idle -> processing" in m for m in capture.messages) def test_log_token_usage(self, obs): obs.log_token_usage("pm_agent", "1.1", input_tokens=100, output_tokens=50, model="claude-3") metrics = obs.get_metrics() assert metrics["total_tokens"] == 150 assert metrics["per_agent"]["pm_agent"]["tokens"] == 150 def test_log_error(self, obs): try: raise ValueError("test error") except ValueError as e: obs.log_error("pm_agent", "1.1", e, context={"step": "parse"}) metrics = obs.get_metrics() assert metrics["total_errors"] == 1 assert metrics["per_agent"]["pm_agent"]["errors"] == 1 def test_log_claude_event(self, obs): obs.log_claude_event("pm_agent", "1.1", "tool_use", {"tool_name": "Bash"}) obs.log_claude_event("pm_agent", "1.1", "result_message", {"subtype": "success"}) metrics = obs.get_metrics() assert metrics["total_claude_events"] == 2 assert metrics["total_tool_calls"] == 1 assert metrics["per_agent"]["pm_agent"]["claude_events"] == 2 assert metrics["per_agent"]["pm_agent"]["tool_calls"] == 1 def test_log_claude_event_readable_format(self, obs): capture = self._CaptureHandler() obs.logger.addHandler(capture) obs.log_claude_event( "pm_agent", "1.1", "tool_use", {"tool_name": "Bash", "tool_use_id": "toolu_1", "tool_input": {"command": "pwd"}}, ) obs.logger.removeHandler(capture) combined = "\n".join(capture.messages) assert "Claude tool call: Bash" in combined assert "command=pwd" in combined def test_log_claude_event_noisy_tool_result_is_suppressed_on_success(self, obs): capture = self._CaptureHandler() obs.logger.addHandler(capture) obs.log_claude_event( "pm_agent", "1.1", "tool_use", {"tool_name": "Read", "tool_use_id": "toolu_1", "tool_input": {"file_path": "README.md"}}, ) obs.log_claude_event( "pm_agent", "1.1", "tool_result", {"tool_use_id": "toolu_1", "is_error": False, "content": "x" * 1000}, ) obs.logger.removeHandler(capture) combined = "\n".join(capture.messages) assert "Claude tool call: Read path=README.md" in combined assert "Claude tool result: Read" not in combined def test_log_claude_event_noisy_tool_result_logs_errors_with_context(self, obs): capture = self._CaptureHandler() obs.logger.addHandler(capture) obs.log_claude_event( "pm_agent", "1.1", "tool_use", {"tool_name": "Read", "tool_use_id": "toolu_1", "tool_input": {"file_path": "README.md"}}, ) obs.log_claude_event( "pm_agent", "1.1", "tool_result", {"tool_use_id": "toolu_1", "is_error": True, "content": "permission denied"}, ) obs.logger.removeHandler(capture) combined = "\n".join(capture.messages) assert "Claude tool result: Read status=error path=README.md error=permission denied" in combined def test_log_claude_event_filters_noise_by_default(self, obs): capture = self._CaptureHandler() obs.logger.addHandler(capture) obs.log_claude_event("pm_agent", "1.1", "stream_event", {"event": "delta"}) obs.logger.removeHandler(capture) combined = "\n".join(capture.messages) assert "stream_event" not in combined def test_log_claude_event_verbose_mode_includes_noise(self): with patch.dict(sys.modules, {"langsmith": None}): manager = ObservabilityManager("test-project", claude_event_mode="verbose") capture = self._CaptureHandler() manager.logger.addHandler(capture) manager.log_claude_event("pm_agent", "1.1", "stream_event", {"event": "delta"}) manager.logger.removeHandler(capture) combined = "\n".join(capture.messages) assert "Claude event: type=stream_event" in combined # --------------------------------------------------------------------------- # Metrics # --------------------------------------------------------------------------- class TestMetrics: def test_initial_metrics(self, obs): metrics = obs.get_metrics() assert metrics["total_tokens"] == 0 assert metrics["total_traces"] == 0 assert metrics["total_errors"] == 0 assert metrics["total_claude_events"] == 0 assert metrics["total_tool_calls"] == 0 def test_metrics_accumulate(self, obs): obs.start_trace("a1", "1.1") obs.start_trace("a1", "1.2") obs.start_trace("a2", "2.1") obs.log_token_usage("a1", "1.1", 10, 20) obs.log_token_usage("a2", "2.1", 5, 5) metrics = obs.get_metrics() assert metrics["total_traces"] == 3 assert metrics["total_tokens"] == 40 assert metrics["per_agent"]["a1"]["traces"] == 2 assert metrics["per_agent"]["a2"]["traces"] == 1 assert metrics["per_agent"]["a1"]["tokens"] == 30 assert metrics["per_agent"]["a2"]["tokens"] == 10 # --------------------------------------------------------------------------- # Graceful degradation # --------------------------------------------------------------------------- class TestGracefulDegradation: def test_all_operations_work_without_langsmith(self, obs): """Every public method should work fine with _client=None.""" assert obs._client is None run_id = obs.start_trace("agent", "1.1", inputs={"a": 1}) obs.end_trace(run_id, outputs={"b": 2}) obs.end_trace("nonexistent-id", error="whatever") obs.log_state_transition("a", "b") obs.log_token_usage("agent", "1.1", 10, 20) try: raise RuntimeError("test") except RuntimeError as e: obs.log_error("agent", "1.1", e) metrics = obs.get_metrics() assert metrics["total_traces"] == 1