from datetime import datetime from typing import Any, Dict, List, Optional from uuid import uuid4 from sqlalchemy import or_ from sqlalchemy.orm import Session import logging from db.models import ( Box, File, StagedFileProduct, Product, OpenBoxCard, OpenBox, Inventory, TCGPlayerGroups ) from db.utils import db_transaction from schemas.box import CreateBoxRequest, CreateBoxResponse, UpdateBoxRequest, CreateOpenBoxRequest from services.inventory import InventoryService logger = logging.getLogger(__name__) VALID_BOX_TYPES = {"collector", "play", "draft", "set", "commander"} class BoxService: def __init__(self, db: Session, inventory_service: InventoryService): self.db = db self.inventory_service = inventory_service def validate_file_ids(self, file_ids: List[str]) -> None: """Validate that all provided file IDs exist in the database.""" invalid_files = [ file_id for file_id in file_ids if not self.db.query(File).filter(File.id == file_id).first() ] if invalid_files: raise ValueError(f"File IDs not found: {', '.join(invalid_files)}") def get_staged_product_data(self, file_ids: List[str]) -> List[StagedFileProduct]: """Retrieve staged product data for given file IDs.""" return self.db.query(StagedFileProduct).filter( StagedFileProduct.file_id.in_(file_ids) ).all() def aggregate_staged_product_data(self, staged_product_data: List[StagedFileProduct]) -> Dict[Product, int]: """Aggregate staged product data by product and quantity.""" product_data = {} for row in staged_product_data: product = self.db.query(Product).filter(Product.id == row.product_id).first() if product: product_data[product] = product_data.get(product, 0) + row.quantity return product_data def find_product_for_box_data(self, create_box_data: Dict[str, Any]) -> Optional[Product]: """Find existing product matching box data.""" return self.db.query(Product).filter( Product.name == create_box_data["name"], Product.type == "box", Product.set_code == create_box_data["set_code"], Product.set_name == create_box_data["set_name"], Product.product_line == create_box_data["product_line"] ).first() def create_product_for_box(self, create_box_data: Dict[str, Any]) -> Product: """Create a new product for a box.""" product = Product( id=str(uuid4()), name=create_box_data["name"], type="box", set_code=create_box_data["set_code"], set_name=create_box_data["set_name"], product_line=create_box_data["product_line"] ) self.db.add(product) return product def create_box_db(self, product: Product, create_box_data: Dict[str, Any]) -> Box: """Create a new box record in the database.""" box = Box( product_id=product.id, type=create_box_data["type"], sku=create_box_data["sku"], num_cards_expected=create_box_data["num_cards_expected"] ) self.db.add(box) return box def create_open_box(self, product: Product, create_box_data: Dict[str, Any]) -> OpenBox: """Create a new open box record.""" open_box = OpenBox( id=str(uuid4()), product_id=product.id, num_cards_actual=create_box_data["num_cards_actual"], date_opened=datetime.strptime(create_box_data["date_opened"], "%Y-%m-%d") ) self.db.add(open_box) return open_box def add_products_to_open_box(self, open_box: OpenBox, product_data: Dict[Product, int]) -> None: """Add products to an open box.""" for product, quantity in product_data.items(): open_box_card = OpenBoxCard( id=str(uuid4()), open_box_id=open_box.id, card_id=product.id, quantity=quantity ) self.db.add(open_box_card) def format_response(self, open_box: Optional[OpenBox] = None, inventory: Optional[Inventory] = None) -> CreateBoxResponse: """Format the response for box creation.""" return CreateBoxResponse(success=True) def _create_box(self, create_box_data: Dict[str, Any], file_ids: Optional[List[str]] = None) -> CreateBoxResponse: """Internal method to handle box creation logic.""" sealed = create_box_data["sealed"] if file_ids and sealed: raise ValueError("Cannot add cards with a sealed box") if file_ids and not sealed: self.validate_file_ids(file_ids) staged_product_data = self.get_staged_product_data(file_ids) product_data = self.aggregate_staged_product_data(staged_product_data) box_product = self.find_product_for_box_data(create_box_data) try: with db_transaction(self.db): if not box_product: box_product = self.create_product_for_box(create_box_data) box = self.create_box_db(box_product, create_box_data) if not sealed: open_box = self.create_open_box(box_product, create_box_data) if file_ids: self.inventory_service.process_staged_products(product_data) self.add_products_to_open_box(open_box, product_data) # Update file statuses to processed self.db.query(File).filter(File.id.in_(file_ids)).update( {"status": "processed"}, synchronize_session=False ) return self.format_response(open_box=open_box) elif sealed: inventory = self.inventory_service.add_sealed_box_to_inventory(box_product, 1) return self.format_response(inventory=inventory) except Exception as e: logger.error(f"Error creating box: {str(e)}") raise def validate_box_type(self, box_type: str) -> bool: """Validate if the box type is supported.""" return box_type in VALID_BOX_TYPES def validate_set_code(self, set_code: str) -> bool: """Validate if the set code exists in TCGPlayer groups.""" return self.db.query(TCGPlayerGroups).filter( TCGPlayerGroups.abbreviation == set_code ).first() is not None def create_box(self, create_box_data: CreateBoxRequest) -> Box: """Create a new box.""" if not self.validate_box_type(create_box_data.type): raise ValueError("Invalid box type") if not self.validate_set_code(create_box_data.set_code): raise ValueError("Invalid set code") existing_box = self.db.query(Box).filter( or_( Box.type == create_box_data.type, Box.sku == create_box_data.sku ), Box.set_code == create_box_data.set_code ).first() if existing_box: raise ValueError("Box already exists") with db_transaction(self.db): box = Box( product_id=str(uuid4()), type=create_box_data.type, set_code=create_box_data.set_code, sku=create_box_data.sku, num_cards_expected=create_box_data.num_cards_expected ) self.db.add(box) return box def update_box(self, box_id: str, update_box_data: UpdateBoxRequest) -> Box: """Update an existing box.""" box = self.db.query(Box).filter(Box.product_id == box_id).first() if not box: raise ValueError("Box not found") update_data = update_box_data.dict(exclude_unset=True) # Validate box type if it's being updated if "type" in update_data and update_data["type"] is not None: if not self.validate_box_type(update_data["type"]): raise ValueError(f"Invalid box type: {update_data['type']}") # Validate set code if it's being updated if "set_code" in update_data and update_data["set_code"] is not None: if not self.validate_set_code(update_data["set_code"]): raise ValueError(f"Invalid set code: {update_data['set_code']}") with db_transaction(self.db): for field, value in update_data.items(): if value is not None: # Only update non-None values setattr(box, field, value) return box def delete_box(self, box_id: str) -> Box: """Delete a box.""" box = self.db.query(Box).filter(Box.product_id == box_id).first() if not box: raise ValueError("Box not found") with db_transaction(self.db): self.db.delete(box) return box def open_box(self, box_id: str, box_data: CreateOpenBoxRequest) -> OpenBox: """Open a box and process its contents.""" box = self.db.query(Box).filter(Box.product_id == box_id).first() if not box: raise ValueError("Box not found") with db_transaction(self.db): open_box = OpenBox( id=str(uuid4()), product_id=box_id, num_cards_actual=box_data.num_cards_actual, date_opened=datetime.strptime(box_data.date_opened, "%Y-%m-%d") if box_data.date_opened else datetime.now() ) self.db.add(open_box) staged_product_data = self.get_staged_product_data(box_data.file_ids) product_data = self.aggregate_staged_product_data(staged_product_data) self.inventory_service.process_staged_products(product_data) self.add_products_to_open_box(open_box, product_data) # Update file box IDs self.db.query(File).filter(File.id.in_(box_data.file_ids)).update( {"box_id": open_box.id}, synchronize_session=False ) return open_box