ai_giga_tcg/app/services/external_api/base_external_service.py
2025-04-09 21:02:43 -04:00

49 lines
1.7 KiB
Python

from typing import Any, Dict, Optional
import aiohttp
import logging
from app.services.service_registry import ServiceRegistry
logger = logging.getLogger(__name__)
class BaseExternalService:
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url
self.api_key = api_key
self.session = None
# Register the service instance
ServiceRegistry.register(self.__class__.__name__, self)
async def _get_session(self) -> aiohttp.ClientSession:
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession()
return self.session
async def _make_request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
if self.api_key:
headers = headers or {}
headers["Authorization"] = f"Bearer {self.api_key}"
try:
async with session.request(method, url, params=params, headers=headers, json=data) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"API request failed: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error during API request: {str(e)}")
raise
async def close(self):
if self.session and not self.session.closed:
await self.session.close()