import os import json import zipfile import time from typing import Dict, Any, Optional, Generator from sqlalchemy.orm import Session from app.services.external_api.base_external_service import BaseExternalService from app.schemas.file import FileInDB import logging logger = logging.getLogger(__name__) class MTGJSONService(BaseExternalService): def __init__(self, cache_dir: str = "app/data/cache/mtgjson"): super().__init__(base_url="https://mtgjson.com/api/v5/") # Ensure the cache directory exists os.makedirs(cache_dir, exist_ok=True) self.cache_dir = cache_dir self.identifiers_dir = os.path.join(cache_dir, "identifiers") self.skus_dir = os.path.join(cache_dir, "skus") # Ensure subdirectories exist os.makedirs(self.identifiers_dir, exist_ok=True) os.makedirs(self.skus_dir, exist_ok=True) def _format_progress(self, current: int, total: int, start_time: float) -> str: """Format a progress message with percentage and timing information""" elapsed = time.time() - start_time if total > 0: percent = (current / total) * 100 items_per_second = current / elapsed if elapsed > 0 else 0 eta = (total - current) / items_per_second if items_per_second > 0 else 0 return f"[{current}/{total} ({percent:.1f}%)] {items_per_second:.1f} items/sec, ETA: {eta:.1f}s" return f"[{current} items] {current/elapsed:.1f} items/sec" def _print_progress(self, message: str, end: str = "\n") -> None: """Print progress message with flush""" print(message, end=end, flush=True) async def _download_file(self, db: Session, url: str, filename: str, subdir: str) -> FileInDB: """Download a file from the given URL and save it using FileService""" print(f"Downloading {url}...") start_time = time.time() # Use the base external service's _make_request method file_data = await self._make_request( method="GET", endpoint=url.replace(self.base_url, ""), binary=True ) # Save the file using the file service return await self.file_service.save_file( db=db, file_data=file_data, filename=filename, subdir=f"mtgjson/{subdir}", file_type="application/zip", content_type="application/zip" ) async def _unzip_file(self, file_record: FileInDB, subdir: str, db: Session) -> str: """Unzip a file to the specified subdirectory and return the path to the extracted JSON file""" try: # Use the appropriate subdirectory based on the type extract_path = self.identifiers_dir if subdir == "identifiers" else self.skus_dir os.makedirs(extract_path, exist_ok=True) with zipfile.ZipFile(file_record.path, 'r') as zip_ref: json_filename = zip_ref.namelist()[0] zip_ref.extractall(extract_path) json_path = os.path.join(extract_path, json_filename) # Create a file record for the extracted JSON file with open(json_path, 'r') as f: json_data = f.read() json_file_record = await self.file_service.save_file( db=db, file_data=json_data, filename=json_filename, subdir=f"mtgjson/{subdir}", file_type="application/json", content_type="application/json" ) return str(json_file_record.path) except Exception as e: logger.error(f"Error unzipping file: {e}") raise def _stream_json_file(self, file_path: str) -> Generator[Dict[str, Any], None, None]: """Stream a JSON file and yield items one at a time using a streaming parser""" logger.info(f"Starting to stream JSON file: {file_path}") try: with open(file_path, 'r') as f: # First, we need to find the start of the data section data_started = False current_key = None current_value = [] brace_count = 0 for line in f: line = line.strip() if not line: continue if not data_started: if '"data":' in line: data_started = True # Skip the opening brace of the data object line = line[line.find('"data":') + 7:].strip() if line.startswith('{'): line = line[1:].strip() else: # Yield meta data if found if '"meta":' in line: meta_start = line.find('"meta":') + 7 meta_end = line.rfind('}') if meta_end > meta_start: meta_json = line[meta_start:meta_end + 1] try: meta_data = json.loads(meta_json) yield {"type": "meta", "data": meta_data} except json.JSONDecodeError as e: logger.warning(f"Failed to parse meta data: {e}") continue # Process the data section if data_started: if not current_key: # Look for a new key if '"' in line: key_start = line.find('"') + 1 key_end = line.find('"', key_start) if key_end > key_start: current_key = line[key_start:key_end] # Get the rest of the line after the key line = line[key_end + 1:].strip() if ':' in line: line = line[line.find(':') + 1:].strip() if current_key: # Accumulate the value current_value.append(line) brace_count += line.count('{') - line.count('}') if brace_count == 0 and line.endswith(','): # We have a complete value value_str = ''.join(current_value).rstrip(',') try: value = json.loads(value_str) yield {"type": "item", "data": {current_key: value}} except json.JSONDecodeError as e: logger.warning(f"Failed to parse value for key {current_key}: {e}") current_key = None current_value = [] except Exception as e: logger.error(f"Error streaming JSON file: {e}") raise async def get_identifiers(self, db: Session) -> Generator[Dict[str, Any], None, None]: """Download and get MTGJSON identifiers data""" # Check if we have a cached version cached_file = await self.file_service.get_file_by_filename(db, "AllIdentifiers.json") if cached_file: # Ensure the file exists at the path if os.path.exists(cached_file.path): return self._stream_json_file(cached_file.path) # Download and process the file file_record = await self._download_file( db=db, url="https://mtgjson.com/api/v5/AllIdentifiers.json.zip", filename="AllIdentifiers.json.zip", subdir="identifiers" ) # Unzip and process the file json_path = await self._unzip_file(file_record, "identifiers", db) # Return a generator that streams the JSON file return self._stream_json_file(json_path) async def get_skus(self, db: Session) -> Generator[Dict[str, Any], None, None]: """Download and get MTGJSON SKUs data""" # Check if we have a cached version cached_file = await self.file_service.get_file_by_filename(db, "TcgplayerSkus.json") if cached_file: # Ensure the file exists at the path if os.path.exists(cached_file.path): return self._stream_json_file(cached_file.path) # Download and process the file file_record = await self._download_file( db=db, url="https://mtgjson.com/api/v5/TcgplayerSkus.json.zip", filename="TcgplayerSkus.json.zip", subdir="skus" ) # Unzip and process the file json_path = await self._unzip_file(file_record, "skus", db) # Return a generator that streams the JSON file return self._stream_json_file(json_path) async def clear_cache(self, db: Session) -> None: """Clear all cached data""" try: # Delete all files in the mtgjson subdirectory files = await self.file_service.list_files(db, file_type=["json", "zip"]) for file in files: if file.path.startswith("mtgjson/"): await self.file_service.delete_file(db, file.id) logger.info("MTGJSON cache cleared") except Exception as e: logger.error(f"Error clearing cache: {e}") raise