256 lines
8.3 KiB
Python
256 lines
8.3 KiB
Python
from uuid import uuid4
|
|
from typing import List, TypedDict
|
|
from sqlalchemy.orm import Session
|
|
|
|
from db.utils import db_transaction
|
|
from db.models import (
|
|
Warehouse,
|
|
User,
|
|
StagedFileProduct,
|
|
StorageBlock,
|
|
ProductBlock,
|
|
File,
|
|
CardTCGPlayer
|
|
)
|
|
|
|
class ProductAttributes(TypedDict):
|
|
"""Attributes for a product to be stored."""
|
|
product_id: str
|
|
card_number: str
|
|
|
|
class StorageService:
|
|
"""Service for managing product storage and warehouse operations."""
|
|
|
|
def __init__(self, db: Session) -> None:
|
|
"""Initialize the storage service.
|
|
|
|
Args:
|
|
db: SQLAlchemy database session
|
|
"""
|
|
self.db = db
|
|
|
|
def get_or_create_user(self, username: str) -> User:
|
|
"""Get an existing user or create a new one if not found.
|
|
|
|
Args:
|
|
username: Username to look up or create
|
|
|
|
Returns:
|
|
The existing or newly created User
|
|
"""
|
|
user = self.db.query(User).filter(User.username == username).first()
|
|
if user is None:
|
|
user = User(
|
|
id=str(uuid4()),
|
|
username=username
|
|
)
|
|
with db_transaction(self.db):
|
|
self.db.add(user)
|
|
return user
|
|
|
|
def get_or_create_warehouse(self) -> Warehouse:
|
|
"""Get the default warehouse or create it if it doesn't exist.
|
|
|
|
Returns:
|
|
The existing or newly created Warehouse
|
|
"""
|
|
warehouse = self.db.query(Warehouse).first()
|
|
user = self.get_or_create_user('admin')
|
|
if warehouse is None:
|
|
warehouse = Warehouse(
|
|
id=str(uuid4()),
|
|
user_id=user.id
|
|
)
|
|
with db_transaction(self.db):
|
|
self.db.add(warehouse)
|
|
return warehouse
|
|
|
|
def get_staged_product(self, file_id: str) -> List[StagedFileProduct]:
|
|
"""Get all staged products for a given file.
|
|
|
|
Args:
|
|
file_id: ID of the file to get staged products for
|
|
|
|
Returns:
|
|
List of staged products
|
|
"""
|
|
return self.db.query(StagedFileProduct).filter(
|
|
StagedFileProduct.file_id == file_id
|
|
).all()
|
|
|
|
def get_storage_block_name(self, warehouse: Warehouse, file_id: str) -> str:
|
|
"""Generate a unique name for a new storage block.
|
|
|
|
Args:
|
|
warehouse: Warehouse the block belongs to
|
|
file_id: ID of the file being processed
|
|
|
|
Returns:
|
|
Unique storage block name
|
|
|
|
Raises:
|
|
ValueError: If no file is found with the given ID
|
|
"""
|
|
current_file = self.db.query(File).filter(File.id == file_id).first()
|
|
if not current_file:
|
|
raise ValueError(f"No file found with id {file_id}")
|
|
|
|
storage_block_type = 'rare' if 'rare' in current_file.type else 'common'
|
|
prefix = storage_block_type[0]
|
|
|
|
latest_block = (
|
|
self.db.query(StorageBlock)
|
|
.filter(
|
|
StorageBlock.warehouse_id == warehouse.id,
|
|
StorageBlock.type == storage_block_type
|
|
)
|
|
.order_by(StorageBlock.date_created.desc())
|
|
.first()
|
|
)
|
|
|
|
start_number = 1 if not latest_block else int(latest_block.name[1:]) + 1
|
|
|
|
while True:
|
|
new_name = f"{prefix}{start_number}"
|
|
exists = (
|
|
self.db.query(StorageBlock)
|
|
.filter(
|
|
StorageBlock.warehouse_id == warehouse.id,
|
|
StorageBlock.name == new_name
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not exists:
|
|
return new_name
|
|
start_number += 1
|
|
|
|
def create_storage_block(self, warehouse: Warehouse, file_id: str) -> StorageBlock:
|
|
"""Create a new storage block for the given warehouse and file.
|
|
|
|
Args:
|
|
warehouse: Warehouse to create the block in
|
|
file_id: ID of the file being processed
|
|
|
|
Returns:
|
|
Newly created StorageBlock
|
|
|
|
Raises:
|
|
ValueError: If no file is found with the given ID
|
|
"""
|
|
current_file = self.db.query(File).filter(File.id == file_id).first()
|
|
if not current_file:
|
|
raise ValueError(f"No file found with id {file_id}")
|
|
|
|
storage_block_type = 'rare' if 'rare' in current_file.type else 'common'
|
|
|
|
storage_block = StorageBlock(
|
|
id=str(uuid4()),
|
|
warehouse_id=warehouse.id,
|
|
name=self.get_storage_block_name(warehouse, file_id),
|
|
type=storage_block_type
|
|
)
|
|
with db_transaction(self.db):
|
|
self.db.add(storage_block)
|
|
return storage_block
|
|
|
|
def add_staged_product_to_product_block(
|
|
self,
|
|
staged_product: StagedFileProduct,
|
|
storage_block: StorageBlock,
|
|
product_attributes: ProductAttributes,
|
|
block_index: int
|
|
) -> ProductBlock:
|
|
"""Create a new ProductBlock for a single unit of a staged product.
|
|
|
|
Args:
|
|
staged_product: The staged product to store
|
|
storage_block: The block to store the product in
|
|
product_attributes: Additional product attributes
|
|
block_index: Index within the storage block
|
|
|
|
Returns:
|
|
Newly created ProductBlock
|
|
"""
|
|
product_block = ProductBlock(
|
|
id=str(uuid4()),
|
|
product_id=staged_product.product_id,
|
|
block_id=storage_block.id,
|
|
block_index=block_index
|
|
)
|
|
|
|
with db_transaction(self.db):
|
|
self.db.add(product_block)
|
|
|
|
return product_block
|
|
|
|
def get_staged_product_attributes_for_storage(
|
|
self,
|
|
staged_product: StagedFileProduct
|
|
) -> List[ProductAttributes]:
|
|
"""Get attributes for each unit of a staged product.
|
|
|
|
Args:
|
|
staged_product: The staged product to get attributes for
|
|
|
|
Returns:
|
|
List of attributes for each unit of the product
|
|
"""
|
|
result = (
|
|
self.db.query(
|
|
StagedFileProduct.product_id,
|
|
StagedFileProduct.quantity,
|
|
CardTCGPlayer.number
|
|
)
|
|
.join(CardTCGPlayer, CardTCGPlayer.product_id == StagedFileProduct.product_id)
|
|
.filter(StagedFileProduct.id == staged_product.id)
|
|
.first()
|
|
)
|
|
|
|
if not result:
|
|
return []
|
|
|
|
return [
|
|
ProductAttributes(
|
|
product_id=result.product_id,
|
|
card_number=result.number
|
|
)
|
|
for _ in range(result.quantity)
|
|
]
|
|
|
|
def store_staged_products_for_file(self, file_id: str) -> StorageBlock:
|
|
"""Store all staged products for a file in a new storage block.
|
|
|
|
Args:
|
|
file_id: ID of the file containing staged products
|
|
|
|
Returns:
|
|
The newly created StorageBlock containing all products
|
|
"""
|
|
warehouse = self.get_or_create_warehouse()
|
|
storage_block = self.create_storage_block(warehouse, file_id)
|
|
staged_products = self.get_staged_product(file_id)
|
|
|
|
# Collect all product attributes first
|
|
all_product_attributes = []
|
|
for staged_product in staged_products:
|
|
product_attributes_list = self.get_staged_product_attributes_for_storage(staged_product)
|
|
for attrs in product_attributes_list:
|
|
all_product_attributes.append((staged_product, attrs))
|
|
|
|
# Sort by card number as integer to determine block indices
|
|
sorted_attributes = sorted(
|
|
all_product_attributes,
|
|
key=lambda x: int(''.join(filter(str.isdigit, x[1]['card_number'])))
|
|
)
|
|
|
|
# Add products with correct block indices
|
|
for block_index, (staged_product, product_attributes) in enumerate(sorted_attributes, 1):
|
|
self.add_staged_product_to_product_block(
|
|
staged_product=staged_product,
|
|
storage_block=storage_block,
|
|
product_attributes=product_attributes,
|
|
block_index=block_index
|
|
)
|
|
|
|
return storage_block |