2025-04-09 23:53:05 -04:00

206 lines
10 KiB
Python

from typing import List, Dict, Any
from datetime import datetime
import csv
import io
from app.services.external_api.base_external_service import BaseExternalService
from app.models.tcgplayer_group import TCGPlayerGroup
from app.models.tcgplayer_product import TCGPlayerProduct
from app.models.tcgplayer_category import TCGPlayerCategory
from sqlalchemy.orm import Session
class TCGCSVService(BaseExternalService):
def __init__(self):
super().__init__(base_url="https://tcgcsv.com/tcgplayer/")
async def get_groups(self, game_ids: List[int]) -> Dict[str, Any]:
"""Fetch groups for specific game IDs from TCGCSV API"""
game_ids_str = ",".join(map(str, game_ids))
endpoint = f"{game_ids_str}/groups"
return await self._make_request("GET", endpoint)
async def get_products_and_prices(self, game_ids: List[int], group_id: int) -> List[Dict[str, Any]]:
"""Fetch products and prices for a specific group from TCGCSV API"""
game_ids_str = ",".join(map(str, game_ids))
endpoint = f"{game_ids_str}/{group_id}/ProductsAndPrices.csv"
response = await self._make_request("GET", endpoint, headers={"Accept": "text/csv"})
# Parse CSV response
csv_data = io.StringIO(response)
reader = csv.DictReader(csv_data)
return list(reader)
async def get_categories(self) -> Dict[str, Any]:
"""Fetch all categories from TCGCSV API"""
endpoint = "categories"
return await self._make_request("GET", endpoint)
async def sync_groups_to_db(self, db: Session, game_ids: List[int]) -> List[TCGPlayerGroup]:
"""Fetch groups from API and sync them to the database"""
response = await self.get_groups(game_ids)
if not response.get("success"):
raise Exception(f"Failed to fetch groups: {response.get('errors')}")
groups = response.get("results", [])
synced_groups = []
for group_data in groups:
# Convert string dates to datetime objects
published_on = datetime.fromisoformat(group_data["publishedOn"].replace("Z", "+00:00")) if group_data.get("publishedOn") else None
modified_on = datetime.fromisoformat(group_data["modifiedOn"].replace("Z", "+00:00")) if group_data.get("modifiedOn") else None
# Check if group already exists
existing_group = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.group_id == group_data["groupId"]).first()
if existing_group:
# Update existing group
for key, value in {
"name": group_data["name"],
"abbreviation": group_data.get("abbreviation"),
"is_supplemental": group_data.get("isSupplemental", False),
"published_on": published_on,
"modified_on": modified_on,
"category_id": group_data.get("categoryId")
}.items():
setattr(existing_group, key, value)
synced_groups.append(existing_group)
else:
# Create new group
new_group = TCGPlayerGroup(
group_id=group_data["groupId"],
name=group_data["name"],
abbreviation=group_data.get("abbreviation"),
is_supplemental=group_data.get("isSupplemental", False),
published_on=published_on,
modified_on=modified_on,
category_id=group_data.get("categoryId")
)
db.add(new_group)
synced_groups.append(new_group)
db.commit()
return synced_groups
async def sync_products_to_db(self, db: Session, game_id: int, group_id: int) -> List[TCGPlayerProduct]:
"""Fetch products and prices for a group and sync them to the database"""
products_data = await self.get_products_and_prices(game_id, group_id)
synced_products = []
for product_data in products_data:
# Convert string dates to datetime objects
modified_on = datetime.fromisoformat(product_data["modifiedOn"].replace("Z", "+00:00")) if product_data.get("modifiedOn") else None
# Convert price strings to floats, handling empty strings
def parse_price(price_str):
return float(price_str) if price_str else None
# Check if product already exists
existing_product = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.product_id == int(product_data["productId"])).first()
if existing_product:
# Update existing product
for key, value in {
"name": product_data["name"],
"clean_name": product_data.get("cleanName"),
"image_url": product_data.get("imageUrl"),
"category_id": int(product_data["categoryId"]),
"group_id": int(product_data["groupId"]),
"url": product_data.get("url"),
"modified_on": modified_on,
"image_count": int(product_data.get("imageCount", 0)),
"ext_rarity": product_data.get("extRarity"),
"ext_number": product_data.get("extNumber"),
"low_price": parse_price(product_data.get("lowPrice")),
"mid_price": parse_price(product_data.get("midPrice")),
"high_price": parse_price(product_data.get("highPrice")),
"market_price": parse_price(product_data.get("marketPrice")),
"direct_low_price": parse_price(product_data.get("directLowPrice")),
"sub_type_name": product_data.get("subTypeName")
}.items():
setattr(existing_product, key, value)
synced_products.append(existing_product)
else:
# Create new product
new_product = TCGPlayerProduct(
product_id=int(product_data["productId"]),
name=product_data["name"],
clean_name=product_data.get("cleanName"),
image_url=product_data.get("imageUrl"),
category_id=int(product_data["categoryId"]),
group_id=int(product_data["groupId"]),
url=product_data.get("url"),
modified_on=modified_on,
image_count=int(product_data.get("imageCount", 0)),
ext_rarity=product_data.get("extRarity"),
ext_number=product_data.get("extNumber"),
low_price=parse_price(product_data.get("lowPrice")),
mid_price=parse_price(product_data.get("midPrice")),
high_price=parse_price(product_data.get("highPrice")),
market_price=parse_price(product_data.get("marketPrice")),
direct_low_price=parse_price(product_data.get("directLowPrice")),
sub_type_name=product_data.get("subTypeName")
)
db.add(new_product)
synced_products.append(new_product)
db.commit()
return synced_products
async def sync_categories_to_db(self, db: Session) -> List[TCGPlayerCategory]:
"""Fetch categories from API and sync them to the database"""
response = await self.get_categories()
if not response.get("success"):
raise Exception(f"Failed to fetch categories: {response.get('errors')}")
categories = response.get("results", [])
synced_categories = []
for category_data in categories:
# Convert string dates to datetime objects
modified_on = datetime.fromisoformat(category_data["modifiedOn"].replace("Z", "+00:00")) if category_data.get("modifiedOn") else None
# Check if category already exists
existing_category = db.query(TCGPlayerCategory).filter(TCGPlayerCategory.category_id == category_data["categoryId"]).first()
if existing_category:
# Update existing category
for key, value in {
"name": category_data["name"],
"display_name": category_data.get("displayName"),
"seo_category_name": category_data.get("seoCategoryName"),
"category_description": category_data.get("categoryDescription"),
"category_page_title": category_data.get("categoryPageTitle"),
"sealed_label": category_data.get("sealedLabel"),
"non_sealed_label": category_data.get("nonSealedLabel"),
"condition_guide_url": category_data.get("conditionGuideUrl"),
"is_scannable": category_data.get("isScannable", False),
"popularity": category_data.get("popularity", 0),
"is_direct": category_data.get("isDirect", False),
"modified_on": modified_on
}.items():
setattr(existing_category, key, value)
synced_categories.append(existing_category)
else:
# Create new category
new_category = TCGPlayerCategory(
category_id=category_data["categoryId"],
name=category_data["name"],
display_name=category_data.get("displayName"),
seo_category_name=category_data.get("seoCategoryName"),
category_description=category_data.get("categoryDescription"),
category_page_title=category_data.get("categoryPageTitle"),
sealed_label=category_data.get("sealedLabel"),
non_sealed_label=category_data.get("nonSealedLabel"),
condition_guide_url=category_data.get("conditionGuideUrl"),
is_scannable=category_data.get("isScannable", False),
popularity=category_data.get("popularity", 0),
is_direct=category_data.get("isDirect", False),
modified_on=modified_on
)
db.add(new_category)
synced_categories.append(new_category)
db.commit()
return synced_categories