from typing import List, Dict, Any from datetime import datetime, timedelta import csv import io from app.services.external_api.base_external_service import BaseExternalService from app.models.tcgplayer_group import TCGPlayerGroup from app.models.tcgplayer_product import TCGPlayerProduct from app.models.tcgplayer_category import TCGPlayerCategory from sqlalchemy.orm import Session import py7zr import os class TCGCSVService(BaseExternalService): def __init__(self): super().__init__(base_url="https://tcgcsv.com/") async def get_groups(self, game_ids: List[int]) -> Dict[str, Any]: """Fetch groups for specific game IDs from TCGCSV API""" game_ids_str = ",".join(map(str, game_ids)) endpoint = f"tcgplayer/{game_ids_str}/groups" return await self._make_request("GET", endpoint) async def get_products_and_prices(self, game_ids: List[int], group_id: int) -> List[Dict[str, Any]]: """Fetch products and prices for a specific group from TCGCSV API""" game_ids_str = ",".join(map(str, game_ids)) endpoint = f"tcgplayer/{game_ids_str}/{group_id}/ProductsAndPrices.csv" response = await self._make_request("GET", endpoint, headers={"Accept": "text/csv"}) # Parse CSV response csv_data = io.StringIO(response) reader = csv.DictReader(csv_data) return list(reader) async def get_categories(self) -> Dict[str, Any]: """Fetch all categories from TCGCSV API""" endpoint = "tcgplayer/categories" return await self._make_request("GET", endpoint) async def get_archived_prices_for_date(self, date_str: str): """Fetch archived prices from TCGCSV API""" # Check if the date directory already exists extract_path = f"app/data/cache/tcgcsv/prices/{date_str}" if os.path.exists(extract_path): print(f"Prices for date {date_str} already exist, skipping download") return date_str # Download the archive file endpoint = f"archive/tcgplayer/prices-{date_str}.ppmd.7z" response = await self._make_request("GET", endpoint, binary=True) # Save the archive file archive_path = f"app/data/cache/tcgcsv/prices/zip/prices-{date_str}.ppmd.7z" os.makedirs(os.path.dirname(archive_path), exist_ok=True) with open(archive_path, "wb") as f: f.write(response) # Extract the 7z file with py7zr.SevenZipFile(archive_path, 'r') as archive: # Extract to a directory named after the date os.makedirs(extract_path, exist_ok=True) archive.extractall(path=extract_path) # The extracted files will be in a directory structure like: # {date_str}/{game_id}/{group_id}/prices return date_str async def get_archived_prices_for_date_range(self, start_date: str, end_date: str): """Fetch archived prices for a date range from TCGCSV API""" # Convert string dates to datetime objects start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") # Set minimum start date min_start_date = datetime.strptime("2025-02-08", "%Y-%m-%d") if start_dt < min_start_date: start_dt = min_start_date # Set maximum end date to today today = datetime.now() if end_dt > today: end_dt = today # Generate date range date_range = [] current_dt = start_dt while current_dt <= end_dt: date_range.append(current_dt.strftime("%Y-%m-%d")) current_dt += timedelta(days=1) # Process each date for date_str in date_range: await self.get_archived_prices_for_date(date_str) async def sync_groups_to_db(self, db: Session, game_ids: List[int]) -> List[TCGPlayerGroup]: """Fetch groups from API and sync them to the database""" response = await self.get_groups(game_ids) if not response.get("success"): raise Exception(f"Failed to fetch groups: {response.get('errors')}") groups = response.get("results", []) synced_groups = [] for group_data in groups: # Convert string dates to datetime objects published_on = datetime.fromisoformat(group_data["publishedOn"].replace("Z", "+00:00")) if group_data.get("publishedOn") else None modified_on = datetime.fromisoformat(group_data["modifiedOn"].replace("Z", "+00:00")) if group_data.get("modifiedOn") else None # Check if group already exists existing_group = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.group_id == group_data["groupId"]).first() if existing_group: # Update existing group for key, value in { "name": group_data["name"], "abbreviation": group_data.get("abbreviation"), "is_supplemental": group_data.get("isSupplemental", False), "published_on": published_on, "modified_on": modified_on, "category_id": group_data.get("categoryId") }.items(): setattr(existing_group, key, value) synced_groups.append(existing_group) else: # Create new group new_group = TCGPlayerGroup( group_id=group_data["groupId"], name=group_data["name"], abbreviation=group_data.get("abbreviation"), is_supplemental=group_data.get("isSupplemental", False), published_on=published_on, modified_on=modified_on, category_id=group_data.get("categoryId") ) db.add(new_group) synced_groups.append(new_group) db.commit() return synced_groups async def sync_products_to_db(self, db: Session, game_id: int, group_id: int) -> List[TCGPlayerProduct]: """Fetch products and prices for a group and sync them to the database""" products_data = await self.get_products_and_prices(game_id, group_id) synced_products = [] for product_data in products_data: # Convert string dates to datetime objects modified_on = datetime.fromisoformat(product_data["modifiedOn"].replace("Z", "+00:00")) if product_data.get("modifiedOn") else None # Convert price strings to floats, handling empty strings def parse_price(price_str): return float(price_str) if price_str else None # Check if product already exists existing_product = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.product_id == int(product_data["productId"])).first() if existing_product: # Update existing product for key, value in { "name": product_data["name"], "clean_name": product_data.get("cleanName"), "image_url": product_data.get("imageUrl"), "category_id": int(product_data["categoryId"]), "group_id": int(product_data["groupId"]), "url": product_data.get("url"), "modified_on": modified_on, "image_count": int(product_data.get("imageCount", 0)), "ext_rarity": product_data.get("extRarity"), "ext_number": product_data.get("extNumber"), "low_price": parse_price(product_data.get("lowPrice")), "mid_price": parse_price(product_data.get("midPrice")), "high_price": parse_price(product_data.get("highPrice")), "market_price": parse_price(product_data.get("marketPrice")), "direct_low_price": parse_price(product_data.get("directLowPrice")), "sub_type_name": product_data.get("subTypeName") }.items(): setattr(existing_product, key, value) synced_products.append(existing_product) else: # Create new product new_product = TCGPlayerProduct( product_id=int(product_data["productId"]), name=product_data["name"], clean_name=product_data.get("cleanName"), image_url=product_data.get("imageUrl"), category_id=int(product_data["categoryId"]), group_id=int(product_data["groupId"]), url=product_data.get("url"), modified_on=modified_on, image_count=int(product_data.get("imageCount", 0)), ext_rarity=product_data.get("extRarity"), ext_number=product_data.get("extNumber"), low_price=parse_price(product_data.get("lowPrice")), mid_price=parse_price(product_data.get("midPrice")), high_price=parse_price(product_data.get("highPrice")), market_price=parse_price(product_data.get("marketPrice")), direct_low_price=parse_price(product_data.get("directLowPrice")), sub_type_name=product_data.get("subTypeName") ) db.add(new_product) synced_products.append(new_product) db.commit() return synced_products async def sync_categories_to_db(self, db: Session) -> List[TCGPlayerCategory]: """Fetch categories from API and sync them to the database""" response = await self.get_categories() if not response.get("success"): raise Exception(f"Failed to fetch categories: {response.get('errors')}") categories = response.get("results", []) synced_categories = [] for category_data in categories: # Convert string dates to datetime objects modified_on = datetime.fromisoformat(category_data["modifiedOn"].replace("Z", "+00:00")) if category_data.get("modifiedOn") else None # Check if category already exists existing_category = db.query(TCGPlayerCategory).filter(TCGPlayerCategory.category_id == category_data["categoryId"]).first() if existing_category: # Update existing category for key, value in { "name": category_data["name"], "display_name": category_data.get("displayName"), "seo_category_name": category_data.get("seoCategoryName"), "category_description": category_data.get("categoryDescription"), "category_page_title": category_data.get("categoryPageTitle"), "sealed_label": category_data.get("sealedLabel"), "non_sealed_label": category_data.get("nonSealedLabel"), "condition_guide_url": category_data.get("conditionGuideUrl"), "is_scannable": category_data.get("isScannable", False), "popularity": category_data.get("popularity", 0), "is_direct": category_data.get("isDirect", False), "modified_on": modified_on }.items(): setattr(existing_category, key, value) synced_categories.append(existing_category) else: # Create new category new_category = TCGPlayerCategory( category_id=category_data["categoryId"], name=category_data["name"], display_name=category_data.get("displayName"), seo_category_name=category_data.get("seoCategoryName"), category_description=category_data.get("categoryDescription"), category_page_title=category_data.get("categoryPageTitle"), sealed_label=category_data.get("sealedLabel"), non_sealed_label=category_data.get("nonSealedLabel"), condition_guide_url=category_data.get("conditionGuideUrl"), is_scannable=category_data.get("isScannable", False), popularity=category_data.get("popularity", 0), is_direct=category_data.get("isDirect", False), modified_on=modified_on ) db.add(new_category) synced_categories.append(new_category) db.commit() return synced_categories