256 lines
10 KiB
Python

from datetime import datetime
from typing import Any, Dict, List, Optional
from uuid import uuid4
from sqlalchemy import or_
from sqlalchemy.orm import Session
import logging
from db.models import (
Box,
File,
StagedFileProduct,
Product,
OpenBoxCard,
OpenBox,
Inventory,
TCGPlayerGroups
)
from db.utils import db_transaction
from schemas.box import CreateBoxRequest, CreateBoxResponse, UpdateBoxRequest, CreateOpenBoxRequest
from services.inventory import InventoryService
logger = logging.getLogger(__name__)
VALID_BOX_TYPES = {"collector", "play", "draft", "set", "commander"}
class BoxService:
def __init__(self, db: Session, inventory_service: InventoryService):
self.db = db
self.inventory_service = inventory_service
def validate_file_ids(self, file_ids: List[str]) -> None:
"""Validate that all provided file IDs exist in the database."""
invalid_files = [
file_id for file_id in file_ids
if not self.db.query(File).filter(File.id == file_id).first()
]
if invalid_files:
raise ValueError(f"File IDs not found: {', '.join(invalid_files)}")
def get_staged_product_data(self, file_ids: List[str]) -> List[StagedFileProduct]:
"""Retrieve staged product data for given file IDs."""
return self.db.query(StagedFileProduct).filter(
StagedFileProduct.file_id.in_(file_ids)
).all()
def aggregate_staged_product_data(self, staged_product_data: List[StagedFileProduct]) -> Dict[Product, int]:
"""Aggregate staged product data by product and quantity."""
product_data = {}
for row in staged_product_data:
product = self.db.query(Product).filter(Product.id == row.product_id).first()
if product:
product_data[product] = product_data.get(product, 0) + row.quantity
return product_data
def find_product_for_box_data(self, create_box_data: Dict[str, Any]) -> Optional[Product]:
"""Find existing product matching box data."""
return self.db.query(Product).filter(
Product.name == create_box_data["name"],
Product.type == "box",
Product.set_code == create_box_data["set_code"],
Product.set_name == create_box_data["set_name"],
Product.product_line == create_box_data["product_line"]
).first()
def create_product_for_box(self, create_box_data: Dict[str, Any]) -> Product:
"""Create a new product for a box."""
product = Product(
id=str(uuid4()),
name=create_box_data["name"],
type="box",
set_code=create_box_data["set_code"],
set_name=create_box_data["set_name"],
product_line=create_box_data["product_line"]
)
self.db.add(product)
return product
def create_box_db(self, product: Product, create_box_data: Dict[str, Any]) -> Box:
"""Create a new box record in the database."""
box = Box(
product_id=product.id,
type=create_box_data["type"],
sku=create_box_data["sku"],
num_cards_expected=create_box_data["num_cards_expected"]
)
self.db.add(box)
return box
def create_open_box(self, product: Product, create_box_data: Dict[str, Any]) -> OpenBox:
"""Create a new open box record."""
open_box = OpenBox(
id=str(uuid4()),
product_id=product.id,
num_cards_actual=create_box_data["num_cards_actual"],
date_opened=datetime.strptime(create_box_data["date_opened"], "%Y-%m-%d")
)
self.db.add(open_box)
return open_box
def add_products_to_open_box(self, open_box: OpenBox, product_data: Dict[Product, int]) -> None:
"""Add products to an open box."""
for product, quantity in product_data.items():
open_box_card = OpenBoxCard(
id=str(uuid4()),
open_box_id=open_box.id,
card_id=product.id,
quantity=quantity
)
self.db.add(open_box_card)
def format_response(self, open_box: Optional[OpenBox] = None, inventory: Optional[Inventory] = None) -> CreateBoxResponse:
"""Format the response for box creation."""
return CreateBoxResponse(success=True)
def _create_box(self, create_box_data: Dict[str, Any], file_ids: Optional[List[str]] = None) -> CreateBoxResponse:
"""Internal method to handle box creation logic."""
sealed = create_box_data["sealed"]
if file_ids and sealed:
raise ValueError("Cannot add cards with a sealed box")
if file_ids and not sealed:
self.validate_file_ids(file_ids)
staged_product_data = self.get_staged_product_data(file_ids)
product_data = self.aggregate_staged_product_data(staged_product_data)
box_product = self.find_product_for_box_data(create_box_data)
try:
with db_transaction(self.db):
if not box_product:
box_product = self.create_product_for_box(create_box_data)
box = self.create_box_db(box_product, create_box_data)
if not sealed:
open_box = self.create_open_box(box_product, create_box_data)
if file_ids:
self.inventory_service.process_staged_products(product_data)
self.add_products_to_open_box(open_box, product_data)
# Update file statuses to processed
self.db.query(File).filter(File.id.in_(file_ids)).update(
{"status": "processed"}, synchronize_session=False
)
return self.format_response(open_box=open_box)
elif sealed:
inventory = self.inventory_service.add_sealed_box_to_inventory(box_product, 1)
return self.format_response(inventory=inventory)
except Exception as e:
logger.error(f"Error creating box: {str(e)}")
raise
def validate_box_type(self, box_type: str) -> bool:
"""Validate if the box type is supported."""
return box_type in VALID_BOX_TYPES
def validate_set_code(self, set_code: str) -> bool:
"""Validate if the set code exists in TCGPlayer groups."""
return self.db.query(TCGPlayerGroups).filter(
TCGPlayerGroups.abbreviation == set_code
).first() is not None
def create_box(self, create_box_data: CreateBoxRequest) -> Box:
"""Create a new box."""
if not self.validate_box_type(create_box_data.type):
raise ValueError("Invalid box type")
if not self.validate_set_code(create_box_data.set_code):
raise ValueError("Invalid set code")
existing_box = self.db.query(Box).filter(
or_(
Box.type == create_box_data.type,
Box.sku == create_box_data.sku
),
Box.set_code == create_box_data.set_code
).first()
if existing_box:
raise ValueError("Box already exists")
with db_transaction(self.db):
box = Box(
product_id=str(uuid4()),
type=create_box_data.type,
set_code=create_box_data.set_code,
sku=create_box_data.sku,
num_cards_expected=create_box_data.num_cards_expected
)
self.db.add(box)
return box
def update_box(self, box_id: str, update_box_data: UpdateBoxRequest) -> Box:
"""Update an existing box."""
box = self.db.query(Box).filter(Box.product_id == box_id).first()
if not box:
raise ValueError("Box not found")
update_data = update_box_data.dict(exclude_unset=True)
# Validate box type if it's being updated
if "type" in update_data and update_data["type"] is not None:
if not self.validate_box_type(update_data["type"]):
raise ValueError(f"Invalid box type: {update_data['type']}")
# Validate set code if it's being updated
if "set_code" in update_data and update_data["set_code"] is not None:
if not self.validate_set_code(update_data["set_code"]):
raise ValueError(f"Invalid set code: {update_data['set_code']}")
with db_transaction(self.db):
for field, value in update_data.items():
if value is not None: # Only update non-None values
setattr(box, field, value)
return box
def delete_box(self, box_id: str) -> Box:
"""Delete a box."""
box = self.db.query(Box).filter(Box.product_id == box_id).first()
if not box:
raise ValueError("Box not found")
with db_transaction(self.db):
self.db.delete(box)
return box
def open_box(self, box_id: str, box_data: CreateOpenBoxRequest) -> OpenBox:
"""Open a box and process its contents."""
box = self.db.query(Box).filter(Box.product_id == box_id).first()
if not box:
raise ValueError("Box not found")
with db_transaction(self.db):
open_box = OpenBox(
id=str(uuid4()),
product_id=box_id,
num_cards_actual=box_data.num_cards_actual,
date_opened=datetime.strptime(box_data.date_opened, "%Y-%m-%d") if box_data.date_opened else datetime.now()
)
self.db.add(open_box)
staged_product_data = self.get_staged_product_data(box_data.file_ids)
product_data = self.aggregate_staged_product_data(staged_product_data)
self.inventory_service.process_staged_products(product_data)
self.add_products_to_open_box(open_box, product_data)
# Update file box IDs
self.db.query(File).filter(File.id.in_(box_data.file_ids)).update(
{"box_id": open_box.id}, synchronize_session=False
)
return open_box