from uuid import uuid4 from typing import List, TypedDict from sqlalchemy.orm import Session from app.db.utils import db_transaction from app.db.models import ( Warehouse, User, StagedFileProduct, StorageBlock, ProductBlock, File, CardTCGPlayer ) class ProductAttributes(TypedDict): """Attributes for a product to be stored.""" product_id: str card_number: str class StorageService: """Service for managing product storage and warehouse operations.""" def __init__(self, db: Session) -> None: """Initialize the storage service. Args: db: SQLAlchemy database session """ self.db = db def get_or_create_user(self, username: str) -> User: """Get an existing user or create a new one if not found. Args: username: Username to look up or create Returns: The existing or newly created User """ user = self.db.query(User).filter(User.username == username).first() if user is None: user = User( id=str(uuid4()), username=username ) with db_transaction(self.db): self.db.add(user) return user def get_or_create_warehouse(self) -> Warehouse: """Get the default warehouse or create it if it doesn't exist. Returns: The existing or newly created Warehouse """ warehouse = self.db.query(Warehouse).first() user = self.get_or_create_user('admin') if warehouse is None: warehouse = Warehouse( id=str(uuid4()), user_id=user.id ) with db_transaction(self.db): self.db.add(warehouse) return warehouse def get_staged_product(self, file_id: str) -> List[StagedFileProduct]: """Get all staged products for a given file. Args: file_id: ID of the file to get staged products for Returns: List of staged products """ return self.db.query(StagedFileProduct).filter( StagedFileProduct.file_id == file_id ).all() def get_storage_block_name(self, warehouse: Warehouse, file_id: str) -> str: """Generate a unique name for a new storage block. Args: warehouse: Warehouse the block belongs to file_id: ID of the file being processed Returns: Unique storage block name Raises: ValueError: If no file is found with the given ID """ current_file = self.db.query(File).filter(File.id == file_id).first() if not current_file: raise ValueError(f"No file found with id {file_id}") storage_block_type = 'rare' if 'rare' in current_file.type else 'common' prefix = storage_block_type[0] latest_block = ( self.db.query(StorageBlock) .filter( StorageBlock.warehouse_id == warehouse.id, StorageBlock.type == storage_block_type ) .order_by(StorageBlock.date_created.desc()) .first() ) start_number = 1 if not latest_block else int(latest_block.name[1:]) + 1 while True: new_name = f"{prefix}{start_number}" exists = ( self.db.query(StorageBlock) .filter( StorageBlock.warehouse_id == warehouse.id, StorageBlock.name == new_name ) .first() ) if not exists: return new_name start_number += 1 def create_storage_block(self, warehouse: Warehouse, file_id: str) -> StorageBlock: """Create a new storage block for the given warehouse and file. Args: warehouse: Warehouse to create the block in file_id: ID of the file being processed Returns: Newly created StorageBlock Raises: ValueError: If no file is found with the given ID """ current_file = self.db.query(File).filter(File.id == file_id).first() if not current_file: raise ValueError(f"No file found with id {file_id}") storage_block_type = 'rare' if 'rare' in current_file.type else 'common' storage_block = StorageBlock( id=str(uuid4()), warehouse_id=warehouse.id, name=self.get_storage_block_name(warehouse, file_id), type=storage_block_type ) with db_transaction(self.db): self.db.add(storage_block) return storage_block def add_staged_product_to_product_block( self, staged_product: StagedFileProduct, storage_block: StorageBlock, product_attributes: ProductAttributes, block_index: int ) -> ProductBlock: """Create a new ProductBlock for a single unit of a staged product. Args: staged_product: The staged product to store storage_block: The block to store the product in product_attributes: Additional product attributes block_index: Index within the storage block Returns: Newly created ProductBlock """ product_block = ProductBlock( id=str(uuid4()), product_id=staged_product.product_id, block_id=storage_block.id, block_index=block_index ) with db_transaction(self.db): self.db.add(product_block) return product_block def get_staged_product_attributes_for_storage( self, staged_product: StagedFileProduct ) -> List[ProductAttributes]: """Get attributes for each unit of a staged product. Args: staged_product: The staged product to get attributes for Returns: List of attributes for each unit of the product """ result = ( self.db.query( StagedFileProduct.product_id, StagedFileProduct.quantity, CardTCGPlayer.number ) .join(CardTCGPlayer, CardTCGPlayer.product_id == StagedFileProduct.product_id) .filter(StagedFileProduct.id == staged_product.id) .first() ) if not result: return [] return [ ProductAttributes( product_id=result.product_id, card_number=result.number ) for _ in range(result.quantity) ] def store_staged_products_for_file(self, file_id: str) -> StorageBlock: """Store all staged products for a file in a new storage block. Args: file_id: ID of the file containing staged products Returns: The newly created StorageBlock containing all products """ warehouse = self.get_or_create_warehouse() storage_block = self.create_storage_block(warehouse, file_id) staged_products = self.get_staged_product(file_id) # Collect all product attributes first all_product_attributes = [] for staged_product in staged_products: product_attributes_list = self.get_staged_product_attributes_for_storage(staged_product) for attrs in product_attributes_list: all_product_attributes.append((staged_product, attrs)) # Sort by card number as integer to determine block indices sorted_attributes = sorted( all_product_attributes, key=lambda x: int(''.join(filter(str.isdigit, x[1]['card_number']))) ) # Add products with correct block indices for block_index, (staged_product, product_attributes) in enumerate(sorted_attributes, 1): self.add_staged_product_to_product_block( staged_product=staged_product, storage_block=storage_block, product_attributes=product_attributes, block_index=block_index ) return storage_block