import os import json from datetime import datetime, timezone from typing import Optional, List, Dict, Any, Union from sqlalchemy.orm import Session from app.models.tcgplayer_products import TCGPlayerProduct, TCGPlayerCategory, TCGPlayerGroup from app.models.inventory_management import SealedExpectedValue from app.services.base_service import BaseService from app.schemas.file import FileInDB from app.db.database import transaction as db_transaction from app.schemas.transaction import PurchaseTransactionCreate, PurchaseItem from app.contexts.inventory_item import InventoryItemContextFactory from app.models.tcgplayer_products import MTGJSONSKU, MTGJSONCard from app.models.tcgplayer_products import TCGPlayerPriceHistory from app.models.critical_error_log import CriticalErrorLog import csv import io import logging import shutil import py7zr logger = logging.getLogger(__name__) class DataInitializationService(BaseService): def __init__(self): super().__init__(None) async def _cache_data( self, db: Session, data: Union[dict, list], filename: str, subdir: str, default_str: bool = False, file_type: str = "json", content_type: str = "application/json", metadata: Optional[Dict] = None ) -> FileInDB: """Generic function to cache data to a JSON file""" file_data = json.dumps(data, default=str if default_str else None, indent=2) return await self.file_service.save_file( db, file_data, filename, subdir, file_type=file_type, content_type=content_type, metadata=metadata ) async def _load_cached_data( self, db: Session, filename: str ) -> Optional[Dict[str, Any]]: """Generic function to load cached data from a JSON file with 7-day expiration""" file_record = await self.file_service.get_file_by_filename(db, filename) if file_record: # Check if cache is expired (7 days) # Ensure both datetimes are timezone-aware cache_age = datetime.now(timezone.utc) - file_record.created_at if cache_age.days < 7: with open(file_record.path, 'r') as f: return json.load(f) else: logger.info(f"Cache expired for {filename}, age: {cache_age.days} days") # Delete the expired cache file await self.file_service.delete_file(db, file_record.id) return None async def sync_categories(self, db: Session, categories_data: dict): """Sync categories data to the database using streaming for large datasets""" categories = categories_data.get("results", []) batch_size = 1000 # Process in batches of 1000 total_categories = len(categories) with db_transaction(db): for i in range(0, total_categories, batch_size): batch = categories[i:i + batch_size] for category_data in batch: existing_category = db.query(TCGPlayerCategory).filter(TCGPlayerCategory.category_id == category_data["categoryId"]).first() if existing_category: # Update existing category for key, value in { "name": category_data["name"], "display_name": category_data.get("displayName"), "seo_category_name": category_data.get("seoCategoryName"), "category_description": category_data.get("categoryDescription"), "category_page_title": category_data.get("categoryPageTitle"), "sealed_label": category_data.get("sealedLabel"), "non_sealed_label": category_data.get("nonSealedLabel"), "condition_guide_url": category_data.get("conditionGuideUrl"), "is_scannable": category_data.get("isScannable", False), "popularity": category_data.get("popularity", 0), "is_direct": category_data.get("isDirect", False), "modified_on": datetime.fromisoformat(category_data["modifiedOn"].replace("Z", "+00:00")) if category_data.get("modifiedOn") else None }.items(): setattr(existing_category, key, value) else: new_category = TCGPlayerCategory( category_id=category_data["categoryId"], name=category_data["name"], display_name=category_data.get("displayName"), seo_category_name=category_data.get("seoCategoryName"), category_description=category_data.get("categoryDescription"), category_page_title=category_data.get("categoryPageTitle"), sealed_label=category_data.get("sealedLabel"), non_sealed_label=category_data.get("nonSealedLabel"), condition_guide_url=category_data.get("conditionGuideUrl"), is_scannable=category_data.get("isScannable", False), popularity=category_data.get("popularity", 0), is_direct=category_data.get("isDirect", False), modified_on=datetime.fromisoformat(category_data["modifiedOn"].replace("Z", "+00:00")) if category_data.get("modifiedOn") else None ) db.add(new_category) # Commit after each batch db.commit() logger.info(f"Processed {min(i + batch_size, total_categories)}/{total_categories} categories") async def init_categories(self, db: Session, use_cache: bool = True) -> bool: """Initialize categories data""" logger.info("Starting categories initialization") if use_cache: categories_data = await self._load_cached_data(db, "categories.json") if categories_data: await self.sync_categories(db, categories_data) logger.info("Categories initialized from cache") return True else: logger.warning("No cached categories data found") return False else: tcgcsv_service = self.get_service('tcgcsv') categories_data = await tcgcsv_service.get_categories() # Save the categories data await self._cache_data( db, categories_data, "categories.json", "tcgcsv/categories", file_type="json", content_type="application/json" ) await self.sync_categories(db, categories_data) logger.info("Categories initialized from API") return True async def sync_groups(self, db: Session, groups_data: dict): """Sync groups data to the database using streaming for large datasets""" groups = groups_data.get("results", []) batch_size = 1000 # Process in batches of 1000 total_groups = len(groups) with db_transaction(db): for i in range(0, total_groups, batch_size): batch = groups[i:i + batch_size] for group_data in batch: existing_group = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.group_id == group_data["groupId"]).first() if existing_group: # Update existing group for key, value in { "name": group_data["name"], "abbreviation": group_data.get("abbreviation"), "is_supplemental": group_data.get("isSupplemental", False), "published_on": datetime.fromisoformat(group_data["publishedOn"].replace("Z", "+00:00")) if group_data.get("publishedOn") else None, "modified_on": datetime.fromisoformat(group_data["modifiedOn"].replace("Z", "+00:00")) if group_data.get("modifiedOn") else None, "category_id": group_data.get("categoryId") }.items(): setattr(existing_group, key, value) else: new_group = TCGPlayerGroup( group_id=group_data["groupId"], name=group_data["name"], abbreviation=group_data.get("abbreviation"), is_supplemental=group_data.get("isSupplemental", False), published_on=datetime.fromisoformat(group_data["publishedOn"].replace("Z", "+00:00")) if group_data.get("publishedOn") else None, modified_on=datetime.fromisoformat(group_data["modifiedOn"].replace("Z", "+00:00")) if group_data.get("modifiedOn") else None, category_id=group_data.get("categoryId") ) db.add(new_group) # Commit after each batch db.commit() logger.info(f"Processed {min(i + batch_size, total_groups)}/{total_groups} groups") async def init_groups(self, db: Session, use_cache: bool = True, game_ids: List[int] = None) -> bool: """Initialize groups data""" logger.info(f"Starting groups initialization for game IDs: {game_ids}") tcgcsv_service = self.get_service('tcgcsv') for game_id in game_ids: if use_cache: groups_data = await self._load_cached_data(db, f"groups_{game_id}.json") if groups_data: await self.sync_groups(db, groups_data) logger.info(f"Groups initialized from cache for game ID {game_id}") else: logger.warning(f"No cached groups data found for game ID {game_id}") return False else: groups_data = await tcgcsv_service.get_groups(game_id) # Save the groups data await self._cache_data( db, groups_data, f"groups_{game_id}.json", "tcgcsv/groups", file_type="json", content_type="application/json" ) await self.sync_groups(db, groups_data) logger.info(f"Groups initialized from API for game ID {game_id}") return True async def sync_products(self, db: Session, products_data: str): """Sync products data to the database using streaming for large datasets""" # Parse CSV data csv_reader = csv.DictReader(io.StringIO(products_data)) products_list = list(csv_reader) batch_size = 1000 # Process in batches of 1000 total_products = len(products_list) with db_transaction(db): for i in range(0, total_products, batch_size): batch = products_list[i:i + batch_size] for product_data in batch: sub_type_name = product_data.get("subTypeName") if product_data.get("subTypeName") else "other" existing_product = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.tcgplayer_product_id == product_data["productId"]).filter(TCGPlayerProduct.sub_type_name == sub_type_name).first() if existing_product: # Update existing product for key, value in { "name": product_data["name"], "clean_name": product_data.get("cleanName"), "image_url": product_data.get("imageUrl"), "sub_type_name": product_data.get("subTypeName") if product_data.get("subTypeName") else "other", "normalized_sub_type_name": product_data.get("subTypeName").lower().replace(" ", "_") if product_data.get("subTypeName") else "other", "category_id": product_data.get("categoryId"), "group_id": product_data.get("groupId"), "url": product_data.get("url"), "modified_on": datetime.fromisoformat(product_data["modifiedOn"].replace("Z", "+00:00")) if product_data.get("modifiedOn") else None, "image_count": product_data.get("imageCount", 0), "ext_rarity": product_data.get("extRarity"), "ext_subtype": product_data.get("extSubtype"), "ext_oracle_text": product_data.get("extOracleText"), "ext_number": product_data.get("extNumber"), "low_price": float(product_data.get("lowPrice")) if product_data.get("lowPrice") else None, "mid_price": float(product_data.get("midPrice")) if product_data.get("midPrice") else None, "high_price": float(product_data.get("highPrice")) if product_data.get("highPrice") else None, "market_price": float(product_data.get("marketPrice")) if product_data.get("marketPrice") else None, "direct_low_price": float(product_data.get("directLowPrice")) if product_data.get("directLowPrice") else None, "ext_flavor_text": product_data.get("extFlavorText"), "ext_power": product_data.get("extPower"), "ext_toughness": product_data.get("extToughness"), "ext_flavor_text": product_data.get("extFlavorText") }.items(): setattr(existing_product, key, value) else: logger.debug(f"Creating new product: {product_data['productId']} product name: {product_data['name']}") new_product = TCGPlayerProduct( tcgplayer_product_id=product_data["productId"], name=product_data["name"], normalized_sub_type_name=product_data.get("subTypeName").lower().replace(" ", "_") if product_data.get("subTypeName") else "other", clean_name=product_data.get("cleanName"), image_url=product_data.get("imageUrl"), category_id=product_data.get("categoryId"), group_id=product_data.get("groupId"), url=product_data.get("url"), modified_on=datetime.fromisoformat(product_data["modifiedOn"].replace("Z", "+00:00")) if product_data.get("modifiedOn") else None, image_count=product_data.get("imageCount", 0), ext_rarity=product_data.get("extRarity"), ext_subtype=product_data.get("extSubtype"), ext_oracle_text=product_data.get("extOracleText"), ext_number=product_data.get("extNumber"), low_price=float(product_data.get("lowPrice")) if product_data.get("lowPrice") else None, mid_price=float(product_data.get("midPrice")) if product_data.get("midPrice") else None, high_price=float(product_data.get("highPrice")) if product_data.get("highPrice") else None, market_price=float(product_data.get("marketPrice")) if product_data.get("marketPrice") else None, direct_low_price=float(product_data.get("directLowPrice")) if product_data.get("directLowPrice") else None, sub_type_name=product_data.get("subTypeName") if product_data.get("subTypeName") else "other", ext_power=product_data.get("extPower"), ext_toughness=product_data.get("extToughness"), ext_flavor_text=product_data.get("extFlavorText") ) db.add(new_product) # Commit after each batch db.commit() logger.info(f"Processed {min(i + batch_size, total_products)}/{total_products} products") async def init_products(self, db: Session, use_cache: bool = True, game_ids: List[int] = None) -> bool: """Initialize products data""" logger.info(f"Starting products initialization for game IDs: {game_ids}") tcgcsv_service = self.get_service('tcgcsv') for game_id in game_ids: groups = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.category_id == game_id).all() logger.info(f"Processing {len(groups)} groups for game ID {game_id}") for group in groups: if use_cache: products_data = await self._load_cached_data(db, f"products_{game_id}_{group.group_id}.json") if products_data: await self.sync_products(db, products_data) logger.info(f"Products initialized from cache for group {group.group_id}") else: logger.warning(f"No cached products data found for group {group.group_id}") continue else: # Get CSV data from API csv_data = await tcgcsv_service.get_products_and_prices(game_id, group.group_id) # Save the CSV file await self.file_service.save_file( db, csv_data, f"products_{game_id}_{group.group_id}.csv", "tcgcsv/products", file_type="csv", content_type="text/csv" ) # Parse and sync the CSV data await self.sync_products(db, csv_data) logger.info(f"Products initialized from API for group {group.group_id}") return True async def sync_archived_prices(self, db: Session, archived_prices_data: dict, date: datetime): """Sync archived prices data to the database using bulk operations. Note: Historical prices are never updated, only new records are inserted.""" if not archived_prices_data.get("success"): logger.error("Price data sync failed - success flag is false") return # Get existing records in bulk to avoid duplicates using a composite key existing_records = db.query(TCGPlayerPriceHistory).filter( TCGPlayerPriceHistory.date == date ).all() existing_keys = {(r.product_id, r.date, r.sub_type_name) for r in existing_records} # Prepare batch insert data price_history_batch = [] # Process price data in batches for price_data in archived_prices_data.get("results", []): try: # Get the subtype name from the price data sub_type_name = price_data.get("subTypeName", "None") # First try to find product with the requested subtype product = db.query(TCGPlayerProduct).filter( TCGPlayerProduct.tcgplayer_product_id == price_data["productId"], TCGPlayerProduct.sub_type_name == sub_type_name ).first() # If not found and subtype isn't "other", try with "other" subtype if not product and sub_type_name != "other": product = db.query(TCGPlayerProduct).filter( TCGPlayerProduct.tcgplayer_product_id == price_data["productId"], TCGPlayerProduct.sub_type_name == "other" ).first() if product: sub_type_name = "other" #logger.info(f"Found product {price_data['productId']} with 'other' subtype as fallback for {sub_type_name}") if not product: logger.warning(f"No product found for {price_data['productId']} with subtype {sub_type_name} or 'other'") continue # Skip if record already exists if (product.tcgplayer_product_id, date, sub_type_name) in existing_keys: continue # Validate and convert price data try: price_history = TCGPlayerPriceHistory( product_id=product.tcgplayer_product_id, sub_type_name=sub_type_name, date=date, low_price=float(price_data.get("lowPrice")) if price_data.get("lowPrice") else None, mid_price=float(price_data.get("midPrice")) if price_data.get("midPrice") else None, high_price=float(price_data.get("highPrice")) if price_data.get("highPrice") else None, market_price=float(price_data.get("marketPrice")) if price_data.get("marketPrice") else None, direct_low_price=float(price_data.get("directLowPrice")) if price_data.get("directLowPrice") else None ) price_history_batch.append(price_history) except (ValueError, TypeError) as e: logger.error(f"Invalid price data for product {price_data['productId']}: {str(e)}") continue # Process in batches of 1000 if len(price_history_batch) >= 1000: with db_transaction(db): db.bulk_save_objects(price_history_batch) price_history_batch = [] except Exception as e: logger.error(f"Error processing price data for product {price_data['productId']}: {str(e)}") continue # Process any remaining records if price_history_batch: with db_transaction(db): db.bulk_save_objects(price_history_batch) async def init_archived_prices(self, db: Session, start_date: datetime, end_date: datetime, use_cache: bool = True, game_ids: List[int] = None) -> bool: """Initialize archived prices data""" logger.info(f"Starting archived prices initialization from {start_date} to {end_date}") tcgcsv_service = self.get_service('tcgcsv') processed_dates = await tcgcsv_service.get_tcgcsv_date_range(start_date, end_date) logger.info(f"Processing {len(processed_dates)} dates") # Convert game_ids to set for faster lookups desired_game_ids = set(game_ids) if game_ids else set() for date in processed_dates: date_path = f"app/data/cache/tcgcsv/prices/{date}" # Check if we already have the data for this date if use_cache and os.path.exists(date_path): logger.info(f"Using cached price data for {date}") else: logger.info(f"Downloading and processing archived prices for {date}") # Download and extract the archive archive_data = await tcgcsv_service.get_archived_prices_for_date(date) # Save the archive file file_record = await self.file_service.save_file( db, archive_data, f"prices-{date}.ppmd.7z", "tcgcsv/prices/zip", file_type="application/x-7z-compressed", content_type="application/x-7z-compressed" ) # Extract the 7z file to a temporary directory temp_extract_path = f"app/data/cache/tcgcsv/prices/temp_{date}" os.makedirs(temp_extract_path, exist_ok=True) with py7zr.SevenZipFile(file_record.path, 'r') as archive: archive.extractall(path=temp_extract_path) # Find the date subdirectory in the temp directory date_subdir = os.path.join(temp_extract_path, str(date)) if os.path.exists(date_subdir): # Remove existing directory if it exists if os.path.exists(date_path): shutil.rmtree(date_path) # Create the destination directory os.makedirs(date_path, exist_ok=True) # Move contents from the date subdirectory to the final path for item in os.listdir(date_subdir): src = os.path.join(date_subdir, item) dst = os.path.join(date_path, item) os.rename(src, dst) # Clean up the temporary directory os.rmdir(date_subdir) os.rmdir(temp_extract_path) # Process each category directory for category_id in os.listdir(date_path): # Skip categories that aren't in our desired game IDs if int(category_id) not in desired_game_ids: continue category_path = os.path.join(date_path, category_id) if not os.path.isdir(category_path): continue # Process each group directory for group_id in os.listdir(category_path): group_path = os.path.join(category_path, group_id) if not os.path.isdir(group_path): continue # Process the prices file prices_file = os.path.join(group_path, "prices") if not os.path.exists(prices_file): continue try: with open(prices_file, 'r') as f: price_data = json.load(f) if price_data.get("success"): await self.sync_archived_prices(db, price_data, datetime.strptime(date, "%Y-%m-%d")) logger.info(f"Processed prices for category {category_id}, group {group_id} on {date}") except Exception as e: logger.error(f"Error processing prices file {prices_file}: {str(e)}") continue return True async def init_mtgjson(self, db: Session, use_cache: bool = True) -> Dict[str, Any]: """Initialize MTGJSON data""" logger.info("Starting MTGJSON initialization") mtgjson_service = self.get_service('mtgjson') identifiers_count = 0 skus_count = 0 # Get identifiers data identifiers_data = await mtgjson_service.get_identifiers(db, use_cache) if identifiers_data and "data" in identifiers_data: identifiers_count = await self.sync_mtgjson_identifiers(db, list(identifiers_data["data"].values())) # Get SKUs data skus_data = await mtgjson_service.get_skus(db, use_cache) if skus_data and "data" in skus_data: skus_count = await self.sync_mtgjson_skus(db, skus_data) return { "identifiers_processed": identifiers_count, "skus_processed": skus_count } async def sync_mtgjson_identifiers(self, db: Session, identifiers_data: List[dict]) -> int: count = 0 with db_transaction(db): # Load all existing UUIDs once existing_cards = { card.mtgjson_uuid: card for card in db.query(MTGJSONCard).all() } new_cards = [] for card_data in identifiers_data: if not isinstance(card_data, dict): logger.debug(f"Skipping non-dict item: {card_data}") continue uuid = card_data.get("uuid") identifiers = card_data.get("identifiers", {}) if uuid in existing_cards: card = existing_cards[uuid] updates = { "name": card_data.get("name"), "set_code": card_data.get("setCode"), "abu_id": identifiers.get("abuId"), "card_kingdom_etched_id": identifiers.get("cardKingdomEtchedId"), "card_kingdom_foil_id": identifiers.get("cardKingdomFoilId"), "card_kingdom_id": identifiers.get("cardKingdomId"), "cardsphere_id": identifiers.get("cardsphereId"), "cardsphere_foil_id": identifiers.get("cardsphereFoilId"), "cardtrader_id": identifiers.get("cardtraderId"), "csi_id": identifiers.get("csiId"), "mcm_id": identifiers.get("mcmId"), "mcm_meta_id": identifiers.get("mcmMetaId"), "miniaturemarket_id": identifiers.get("miniaturemarketId"), "mtg_arena_id": identifiers.get("mtgArenaId"), "mtgjson_foil_version_id": identifiers.get("mtgjsonFoilVersionId"), "mtgjson_non_foil_version_id": identifiers.get("mtgjsonNonFoilVersionId"), "mtgjson_v4_id": identifiers.get("mtgjsonV4Id"), "mtgo_foil_id": identifiers.get("mtgoFoilId"), "mtgo_id": identifiers.get("mtgoId"), "multiverse_id": identifiers.get("multiverseId"), "scg_id": identifiers.get("scgId"), "scryfall_id": identifiers.get("scryfallId"), "scryfall_card_back_id": identifiers.get("scryfallCardBackId"), "scryfall_oracle_id": identifiers.get("scryfallOracleId"), "scryfall_illustration_id": identifiers.get("scryfallIllustrationId"), "tcgplayer_product_id": identifiers.get("tcgplayerProductId"), "tcgplayer_etched_product_id": identifiers.get("tcgplayerEtchedProductId"), "tnt_id": identifiers.get("tntId") } for k, v in updates.items(): if getattr(card, k) != v: setattr(card, k, v) else: new_cards.append(MTGJSONCard( mtgjson_uuid=uuid, name=card_data.get("name"), set_code=card_data.get("setCode"), abu_id=identifiers.get("abuId"), card_kingdom_etched_id=identifiers.get("cardKingdomEtchedId"), card_kingdom_foil_id=identifiers.get("cardKingdomFoilId"), card_kingdom_id=identifiers.get("cardKingdomId"), cardsphere_id=identifiers.get("cardsphereId"), cardsphere_foil_id=identifiers.get("cardsphereFoilId"), cardtrader_id=identifiers.get("cardtraderId"), csi_id=identifiers.get("csiId"), mcm_id=identifiers.get("mcmId"), mcm_meta_id=identifiers.get("mcmMetaId"), miniaturemarket_id=identifiers.get("miniaturemarketId"), mtg_arena_id=identifiers.get("mtgArenaId"), mtgjson_foil_version_id=identifiers.get("mtgjsonFoilVersionId"), mtgjson_non_foil_version_id=identifiers.get("mtgjsonNonFoilVersionId"), mtgjson_v4_id=identifiers.get("mtgjsonV4Id"), mtgo_foil_id=identifiers.get("mtgoFoilId"), mtgo_id=identifiers.get("mtgoId"), multiverse_id=identifiers.get("multiverseId"), scg_id=identifiers.get("scgId"), scryfall_id=identifiers.get("scryfallId"), scryfall_card_back_id=identifiers.get("scryfallCardBackId"), scryfall_oracle_id=identifiers.get("scryfallOracleId"), scryfall_illustration_id=identifiers.get("scryfallIllustrationId"), tcgplayer_product_id=identifiers.get("tcgplayerProductId"), tcgplayer_etched_product_id=identifiers.get("tcgplayerEtchedProductId"), tnt_id=identifiers.get("tntId") )) count += 1 if new_cards: db.bulk_save_objects(new_cards) return count async def sync_mtgjson_skus(self, db: Session, skus_data: dict) -> int: count = 0 sku_details_by_key = {} for mtgjson_uuid, product_data in skus_data["data"].items(): for sku_data in product_data: sku_id = sku_data.get("skuId") if sku_id is None or sku_id in sku_details_by_key: continue # Skip if missing or already added sku_details_by_key[sku_id] = { "mtgjson_uuid": mtgjson_uuid, "tcgplayer_sku_id": sku_id, "tcgplayer_product_id": sku_data.get("productId"), "printing": sku_data.get("printing"), "normalized_printing": sku_data.get("printing", "").lower().replace(" ", "_").replace("non_foil", "normal") if sku_data.get("printing") else None, "condition": sku_data.get("condition"), "finish": sku_data.get("finish"), "language": sku_data.get("language"), } with db_transaction(db): db.flush() valid_uuids = {uuid for (uuid,) in db.query(MTGJSONCard.mtgjson_uuid).all()} valid_product_keys = { (product.tcgplayer_product_id, product.normalized_sub_type_name) for product in db.query(TCGPlayerProduct.tcgplayer_product_id, TCGPlayerProduct.normalized_sub_type_name) } existing_sku_ids = { sku.tcgplayer_sku_id for sku in db.query(MTGJSONSKU.tcgplayer_sku_id).all() } existing = { (sku.mtgjson_uuid, sku.tcgplayer_sku_id): sku for sku in db.query(MTGJSONSKU).all() } new_skus = [] for data in sku_details_by_key.values(): sku_id = data["tcgplayer_sku_id"] if sku_id in existing_sku_ids: continue mtgjson_uuid = data["mtgjson_uuid"] product_id = data["tcgplayer_product_id"] normalized_printing = data["normalized_printing"] if mtgjson_uuid not in valid_uuids: continue if (product_id, normalized_printing) not in valid_product_keys: continue key = (mtgjson_uuid, sku_id) if key in existing: record = existing[key] for field, value in data.items(): if field not in ("mtgjson_uuid", "tcgplayer_sku_id") and getattr(record, field) != value: setattr(record, field, value) else: new_skus.append(MTGJSONSKU(**data)) count += 1 if new_skus: db.bulk_save_objects(new_skus) return count async def initialize_data( self, db: Session, game_ids: List[int], use_cache: bool = False, init_categories: bool = True, init_groups: bool = True, init_products: bool = True, init_archived_prices: bool = True, archived_prices_start_date: Optional[str] = None, archived_prices_end_date: Optional[str] = None, init_mtgjson: bool = True ) -> Dict[str, Any]: """Initialize 3rd party API data loads with configurable steps""" logger.info("Starting data initialization process") results = {} if init_categories: logger.info("Initializing categories...") results["categories"] = await self.init_categories(db, use_cache) if init_groups: logger.info("Initializing groups...") results["groups"] = await self.init_groups(db, use_cache, game_ids) if init_products: logger.info("Initializing products...") results["products"] = await self.init_products(db, use_cache, game_ids) if init_archived_prices: logger.info("Initializing archived prices...") results["archived_prices"] = await self.init_archived_prices( db, archived_prices_start_date, archived_prices_end_date, use_cache, game_ids ) if init_mtgjson: logger.info("Initializing MTGJSON data...") results["mtgjson"] = await self.init_mtgjson(db, use_cache) logger.info("Data initialization completed") return results async def clear_cache(self, db: Session) -> None: """Clear all cached data""" # Delete all files in categories, groups, and products directories for subdir in ["categories", "groups", "products"]: files = await self.file_service.list_files(db, file_type="json") for file in files: if file.path.startswith(subdir): await self.file_service.delete_file(db, file.id) await self.mtgjson_service.clear_cache() print("Cache cleared") async def initialize_inventory_data(self, db: Session) -> None: """Initialize inventory data""" with db_transaction(db): logger.info("Initializing inventory data...") # set expected value expected_value_box = SealedExpectedValue( tcgplayer_product_id=619645, expected_value=136.42 ) db.add(expected_value_box) #db.flush() #expected_value_case = SealedExpectedValue( # tcgplayer_product_id=562119, # expected_value=820.69 #) #db.add(expected_value_case) db.flush() inventory_service = self.get_service("inventory") customer = await inventory_service.create_customer(db, "Bob Smith") vendor = await inventory_service.create_vendor(db, "Joe Blow") marketplace = await inventory_service.create_marketplace(db, "Tcgplayer") transaction = await inventory_service.create_purchase_transaction(db, PurchaseTransactionCreate( vendor_id=vendor.id, transaction_date=datetime.now(), items=[PurchaseItem(product_id=619645, unit_price=100, quantity=1, is_case=False)], transaction_notes="tdm real box test" #PurchaseItem(product_id=562119, unit_price=800.01, quantity=2, is_case=True, num_boxes=6)], #transaction_notes="Test Transaction: 1 case and 2 boxes of foundations" )) logger.info(f"Transaction created: {transaction}") case_num = 0 for item in transaction.transaction_items: logger.info(f"Item: {item}") if item.inventory_item.physical_item.item_type == "box": manabox_service = self.get_service("manabox") #file_path = 'app/data/test_data/manabox_test_file.csv' file_path = 'app/data/test_data/tdmtest.csv' file_bytes = open(file_path, 'rb').read() manabox_file = await manabox_service.process_manabox_csv(db, file_bytes, {"source": "test", "description": "test"}, wait=True) # Ensure manabox_file is a list before passing it if not isinstance(manabox_file, list): manabox_file = [manabox_file] box_service = self.get_service("box") open_event = await box_service.open_box(db, item.inventory_item.physical_item, manabox_file) # get all cards from box cards = open_event.resulting_items if open_event.resulting_items else [] marketplace_listing_service = self.get_service("marketplace_listing") for card in cards: logger.info(f"card: {card}") # create marketplace listing await marketplace_listing_service.create_marketplace_listing(db, card.inventory_item, marketplace) elif item.inventory_item.physical_item.item_type == "case": if case_num == 0: logger.info(f"sealed case {case_num} opening...") case_service = self.get_service("case") success = await case_service.open_case(db, item.inventory_item.physical_item, 562119) logger.info(f"sealed case {case_num} opening success: {success}") case_num += 1 logger.info("Inventory data initialized")