import os import json import zipfile import aiohttp import asyncio import time import sys from typing import Dict, Any, Optional, Generator from sqlalchemy.orm import Session from datetime import datetime from app.models.mtgjson_card import MTGJSONCard from app.models.mtgjson_sku import MTGJSONSKU from app.db.database import get_db, transaction from app.services.external_api.base_external_service import BaseExternalService from app.schemas.file import FileInDB class MTGJSONService(BaseExternalService): def __init__(self, cache_dir: str = "app/data/cache/mtgjson", batch_size: int = 1000): super().__init__(base_url="https://mtgjson.com/api/v5/") self.cache_dir = cache_dir self.identifiers_dir = os.path.join(cache_dir, "identifiers") self.skus_dir = os.path.join(cache_dir, "skus") self.batch_size = batch_size # Create necessary directories os.makedirs(cache_dir, exist_ok=True) os.makedirs(self.identifiers_dir, exist_ok=True) os.makedirs(self.skus_dir, exist_ok=True) def _format_progress(self, current: int, total: int, start_time: float) -> str: """Format a progress message with percentage and timing information""" elapsed = time.time() - start_time if total > 0: percent = (current / total) * 100 items_per_second = current / elapsed if elapsed > 0 else 0 eta = (total - current) / items_per_second if items_per_second > 0 else 0 return f"[{current}/{total} ({percent:.1f}%)] {items_per_second:.1f} items/sec, ETA: {eta:.1f}s" return f"[{current} items] {current/elapsed:.1f} items/sec" def _print_progress(self, message: str, end: str = "\n") -> None: """Print progress message with flush""" print(message, end=end, flush=True) async def _download_file(self, db: Session, url: str, filename: str, subdir: str) -> FileInDB: """Download a file from the given URL and save it using FileService""" print(f"Downloading {url}...") start_time = time.time() async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: file_data = await response.read() return await self.save_file( db=db, file_data=file_data, file_name=filename, subdir=f"mtgjson/{subdir}", file_type=response.headers.get('content-type', 'application/octet-stream') ) else: raise Exception(f"Failed to download file from {url}. Status: {response.status}") async def _unzip_file(self, zip_path: str, extract_dir: str) -> str: """Unzip a file to the specified directory and return the path to the extracted JSON file""" with zipfile.ZipFile(zip_path, 'r') as zip_ref: json_filename = zip_ref.namelist()[0] zip_ref.extractall(extract_dir) return os.path.join(extract_dir, json_filename) def _stream_json_file(self, file_path: str) -> Generator[Dict[str, Any], None, None]: """Stream a JSON file and yield items one at a time""" print(f"Starting to stream JSON file: {file_path}") with open(file_path, 'r') as f: # Load the entire file since MTGJSON uses a specific format data = json.load(f) # First yield the meta data if "meta" in data: yield {"type": "meta", "data": data["meta"]} # Then yield each item in the data section if "data" in data: for key, value in data["data"].items(): yield {"type": "item", "data": {key: value}} async def _process_batch(self, db: Session, items: list, model_class) -> int: """Process a batch of items and add them to the database""" processed = 0 with transaction(db): for item in items: if model_class == MTGJSONCard: # Check if card already exists existing_card = db.query(MTGJSONCard).filter(MTGJSONCard.card_id == item["card_id"]).first() if existing_card: continue new_item = MTGJSONCard( card_id=item["card_id"], name=item["name"], set_code=item["set_code"], uuid=item["uuid"], abu_id=item.get("abu_id"), card_kingdom_etched_id=item.get("card_kingdom_etched_id"), card_kingdom_foil_id=item.get("card_kingdom_foil_id"), card_kingdom_id=item.get("card_kingdom_id"), cardsphere_id=item.get("cardsphere_id"), cardsphere_foil_id=item.get("cardsphere_foil_id"), cardtrader_id=item.get("cardtrader_id"), csi_id=item.get("csi_id"), mcm_id=item.get("mcm_id"), mcm_meta_id=item.get("mcm_meta_id"), miniaturemarket_id=item.get("miniaturemarket_id"), mtg_arena_id=item.get("mtg_arena_id"), mtgjson_foil_version_id=item.get("mtgjson_foil_version_id"), mtgjson_non_foil_version_id=item.get("mtgjson_non_foil_version_id"), mtgjson_v4_id=item.get("mtgjson_v4_id"), mtgo_foil_id=item.get("mtgo_foil_id"), mtgo_id=item.get("mtgo_id"), multiverse_id=item.get("multiverse_id"), scg_id=item.get("scg_id"), scryfall_id=item.get("scryfall_id"), scryfall_card_back_id=item.get("scryfall_card_back_id"), scryfall_oracle_id=item.get("scryfall_oracle_id"), scryfall_illustration_id=item.get("scryfall_illustration_id"), tcgplayer_product_id=item.get("tcgplayer_product_id"), tcgplayer_etched_product_id=item.get("tcgplayer_etched_product_id"), tnt_id=item.get("tnt_id") ) else: # MTGJSONSKU # Check if SKU already exists existing_sku = db.query(MTGJSONSKU).filter(MTGJSONSKU.sku_id == item["sku_id"]).first() if existing_sku: continue new_item = MTGJSONSKU( sku_id=str(item["sku_id"]), product_id=str(item["product_id"]), condition=item["condition"], finish=item["finish"], language=item["language"], printing=item["printing"], card_id=item["card_id"] ) db.add(new_item) processed += 1 return processed async def download_and_process_identifiers(self, db: Session) -> Dict[str, int]: """Download, unzip and process AllIdentifiers.json.zip using streaming""" self._print_progress("Starting MTGJSON identifiers processing...") start_time = time.time() # Download the file using FileService file_record = await self._download_file( db=db, url="https://mtgjson.com/api/v5/AllIdentifiers.json.zip", filename="AllIdentifiers.json.zip", subdir="identifiers" ) # Get the file path from the database record zip_path = file_record.path cards_processed = 0 current_batch = [] total_cards = 0 last_progress_time = time.time() self._print_progress("Processing cards...") try: for item in self._stream_json_file(zip_path): if item["type"] == "meta": self._print_progress(f"Processing MTGJSON data version {item['data'].get('version')} from {item['data'].get('date')}") continue card_data = item["data"] card_id = list(card_data.keys())[0] card_info = card_data[card_id] total_cards += 1 current_batch.append({ "card_id": card_id, "name": card_info.get("name"), "set_code": card_info.get("setCode"), "uuid": card_info.get("uuid"), "abu_id": card_info.get("identifiers", {}).get("abuId"), "card_kingdom_etched_id": card_info.get("identifiers", {}).get("cardKingdomEtchedId"), "card_kingdom_foil_id": card_info.get("identifiers", {}).get("cardKingdomFoilId"), "card_kingdom_id": card_info.get("identifiers", {}).get("cardKingdomId"), "cardsphere_id": card_info.get("identifiers", {}).get("cardsphereId"), "cardsphere_foil_id": card_info.get("identifiers", {}).get("cardsphereFoilId"), "cardtrader_id": card_info.get("identifiers", {}).get("cardtraderId"), "csi_id": card_info.get("identifiers", {}).get("csiId"), "mcm_id": card_info.get("identifiers", {}).get("mcmId"), "mcm_meta_id": card_info.get("identifiers", {}).get("mcmMetaId"), "miniaturemarket_id": card_info.get("identifiers", {}).get("miniaturemarketId"), "mtg_arena_id": card_info.get("identifiers", {}).get("mtgArenaId"), "mtgjson_foil_version_id": card_info.get("identifiers", {}).get("mtgjsonFoilVersionId"), "mtgjson_non_foil_version_id": card_info.get("identifiers", {}).get("mtgjsonNonFoilVersionId"), "mtgjson_v4_id": card_info.get("identifiers", {}).get("mtgjsonV4Id"), "mtgo_foil_id": card_info.get("identifiers", {}).get("mtgoFoilId"), "mtgo_id": card_info.get("identifiers", {}).get("mtgoId"), "multiverse_id": card_info.get("identifiers", {}).get("multiverseId"), "scg_id": card_info.get("identifiers", {}).get("scgId"), "scryfall_id": card_info.get("identifiers", {}).get("scryfallId"), "scryfall_card_back_id": card_info.get("identifiers", {}).get("scryfallCardBackId"), "scryfall_oracle_id": card_info.get("identifiers", {}).get("scryfallOracleId"), "scryfall_illustration_id": card_info.get("identifiers", {}).get("scryfallIllustrationId"), "tcgplayer_product_id": card_info.get("identifiers", {}).get("tcgplayerProductId"), "tcgplayer_etched_product_id": card_info.get("identifiers", {}).get("tcgplayerEtchedProductId"), "tnt_id": card_info.get("identifiers", {}).get("tntId"), "data": card_info }) if len(current_batch) >= self.batch_size: batch_processed = await self._process_batch(db, current_batch, MTGJSONCard) cards_processed += batch_processed current_batch = [] current_time = time.time() if current_time - last_progress_time >= 1.0: # Update progress every second self._print_progress(f"\r{self._format_progress(cards_processed, total_cards, start_time)}", end="") last_progress_time = current_time except Exception as e: self._print_progress(f"\nError during processing: {str(e)}") raise # Process remaining items if current_batch: batch_processed = await self._process_batch(db, current_batch, MTGJSONCard) cards_processed += batch_processed total_time = time.time() - start_time self._print_progress(f"\nProcessing complete! Processed {cards_processed} cards in {total_time:.1f} seconds") return {"cards_processed": cards_processed} async def download_and_process_skus(self, db: Session) -> Dict[str, int]: """Download, unzip and process TcgplayerSkus.json.zip using streaming""" self._print_progress("Starting MTGJSON SKUs processing...") start_time = time.time() # Download the file using FileService file_record = await self._download_file( db=db, url="https://mtgjson.com/api/v5/TcgplayerSkus.json.zip", filename="TcgplayerSkus.json.zip", subdir="skus" ) # Get the file path from the database record zip_path = file_record.path skus_processed = 0 current_batch = [] total_skus = 0 last_progress_time = time.time() self._print_progress("Processing SKUs...") try: for item in self._stream_json_file(zip_path): if item["type"] == "meta": self._print_progress(f"Processing MTGJSON SKUs version {item['data'].get('version')} from {item['data'].get('date')}") continue # The data structure is {card_uuid: [sku1, sku2, ...]} for card_uuid, sku_list in item["data"].items(): for sku in sku_list: total_skus += 1 current_batch.append({ "sku_id": str(sku.get("skuId")), "product_id": str(sku.get("productId")), "condition": sku.get("condition"), "finish": sku.get("finish"), "language": sku.get("language"), "printing": sku.get("printing"), "card_id": card_uuid, "data": sku }) if len(current_batch) >= self.batch_size: batch_processed = await self._process_batch(db, current_batch, MTGJSONSKU) skus_processed += batch_processed current_batch = [] current_time = time.time() if current_time - last_progress_time >= 1.0: # Update progress every second self._print_progress(f"\r{self._format_progress(skus_processed, total_skus, start_time)}", end="") last_progress_time = current_time except Exception as e: self._print_progress(f"\nError during processing: {str(e)}") raise # Process remaining items if current_batch: batch_processed = await self._process_batch(db, current_batch, MTGJSONSKU) skus_processed += batch_processed total_time = time.time() - start_time self._print_progress(f"\nProcessing complete! Processed {skus_processed} SKUs in {total_time:.1f} seconds") return {"skus_processed": skus_processed} async def clear_cache(self) -> None: """Clear all cached data""" for subdir in ["identifiers", "skus"]: dir_path = os.path.join(self.cache_dir, subdir) if os.path.exists(dir_path): for filename in os.listdir(dir_path): file_path = os.path.join(dir_path, filename) if os.path.isfile(file_path): os.unlink(file_path) print("MTGJSON cache cleared")