2025-04-18 15:19:57 -04:00

52 lines
2.3 KiB
Python

from typing import List, Dict, Any
from datetime import datetime, timedelta
import csv
import io
from app.services.external_api.base_external_service import BaseExternalService
class TCGCSVService(BaseExternalService):
def __init__(self):
super().__init__(base_url="https://tcgcsv.com/")
async def get_groups(self, game_id: int) -> Dict[str, Any]:
"""Fetch groups for specific game IDs from TCGCSV API"""
endpoint = f"tcgplayer/{game_id}/groups"
return await self._make_request("GET", endpoint)
async def get_products_and_prices(self, game_id: str, group_id: int) -> str:
"""Fetch products and prices for a specific group from TCGCSV API"""
endpoint = f"tcgplayer/{game_id}/{group_id}/ProductsAndPrices.csv"
return await self._make_request("GET", endpoint, headers={"Accept": "text/csv"})
async def get_categories(self) -> Dict[str, Any]:
"""Fetch all categories from TCGCSV API"""
endpoint = "tcgplayer/categories"
return await self._make_request("GET", endpoint)
async def get_archived_prices_for_date(self, date_str: str) -> bytes:
"""Fetch archived prices from TCGCSV API"""
endpoint = f"archive/tcgplayer/prices-{date_str}.ppmd.7z"
return await self._make_request("GET", endpoint, binary=True)
async def get_tcgcsv_date_range(self, start_date: datetime, end_date: datetime) -> List[datetime]:
"""Get a date range for a given start and end date"""
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
min_start_date = datetime.strptime("2024-02-08", "%Y-%m-%d")
max_end_date = datetime.now()
if start_dt < min_start_date:
start_dt = min_start_date
if end_dt > max_end_date:
end_dt = max_end_date
date_range = []
current_dt = start_dt
while current_dt <= end_dt:
date_range.append(current_dt.strftime("%Y-%m-%d"))
current_dt += timedelta(days=1)
return date_range
async def get_archived_prices_for_date_range(self, start_date: datetime, end_date: datetime) -> List[datetime]:
"""Fetch archived prices for a date range from TCGCSV API"""
date_range = await self.get_tcgcsv_date_range(start_date, end_date)
return date_range