from typing import List, Optional, Dict, TypedDict from sqlalchemy.orm import Session, joinedload from decimal import Decimal from app.services.base_service import BaseService from app.models.manabox_import_staging import ManaboxImportStaging from app.contexts.inventory_item import InventoryItemContextFactory from app.models.inventory_management import ( OpenEvent, Card, InventoryItem, Case, SealedExpectedValue, Transaction, TransactionItem, Customer, Vendor, Marketplace, Box, MarketplaceListing ) from app.schemas.file import FileInDB from app.models.inventory_management import PhysicalItem from app.schemas.transaction import PurchaseTransactionCreate, SaleTransactionCreate, TransactionResponse, SealedExpectedValueCreate from app.db.database import transaction as db_transaction from datetime import datetime from typing import Any import logging logger = logging.getLogger(__name__) class InventoryService(BaseService): def __init__(self): super().__init__(None) async def get_resulting_items_for_open_event(self, db: Session, open_event: OpenEvent) -> List[InventoryItem]: # Get the IDs of resulting items resulting_item_ids = [item.id for item in open_event.resulting_items] # Query using the IDs return db.query(InventoryItem).filter(InventoryItem.physical_item_id.in_(resulting_item_ids)).filter(InventoryItem.deleted_at == None).all() async def get_open_event(self, db: Session, inventory_item: InventoryItem, open_event_id: int) -> OpenEvent: return db.query(OpenEvent).filter(OpenEvent.source_item == inventory_item.physical_item).filter(OpenEvent.id == open_event_id).filter(OpenEvent.deleted_at == None).first() async def get_open_events_for_inventory_item(self, db: Session, inventory_item: InventoryItem) -> List[OpenEvent]: return db.query(OpenEvent).filter(OpenEvent.source_item == inventory_item.physical_item).filter(OpenEvent.deleted_at == None).all() async def get_inventory_item(self, db: Session, inventory_item_id: int) -> InventoryItem: return db.query(InventoryItem)\ .options( joinedload(InventoryItem.physical_item).joinedload(PhysicalItem.product_direct) )\ .filter(InventoryItem.id == inventory_item_id)\ .first() async def get_expected_value(self, db: Session, product_id: int) -> float: expected_value = db.query(SealedExpectedValue).filter(SealedExpectedValue.tcgplayer_product_id == product_id).filter(SealedExpectedValue.deleted_at == None).first() return expected_value.expected_value if expected_value else None async def get_transactions(self, db: Session, skip: int, limit: int) -> List[Transaction]: return db.query(Transaction)\ .filter(Transaction.deleted_at == None)\ .order_by(Transaction.transaction_date.desc())\ .offset(skip)\ .limit(limit)\ .all() async def get_transaction(self, db: Session, transaction_id: int) -> Transaction: return db.query(Transaction)\ .options( joinedload(Transaction.transaction_items).joinedload(TransactionItem.inventory_item).joinedload(InventoryItem.physical_item).joinedload(PhysicalItem.product_direct), joinedload(Transaction.vendors), joinedload(Transaction.customers), joinedload(Transaction.marketplaces) )\ .filter(Transaction.id == transaction_id)\ .filter(Transaction.deleted_at == None)\ .first() async def create_expected_value(self, db: Session, expected_value_data: SealedExpectedValueCreate) -> SealedExpectedValue: with db_transaction(db): expected_value = SealedExpectedValue( tcgplayer_product_id=expected_value_data.tcgplayer_product_id, expected_value=expected_value_data.expected_value ) db.add(expected_value) db.flush() return expected_value async def create_purchase_transaction( self, db: Session, transaction_data: PurchaseTransactionCreate ) -> Transaction: """ Creates a purchase transaction from a vendor. For each item: 1. Creates a PhysicalItem (SealedCase/SealedBox) 2. Creates an InventoryItem with the purchase price as cost basis 3. Creates TransactionItems linking the purchase to the items """ try: with db_transaction(db): # Create the transaction transaction = Transaction( vendor_id=transaction_data.vendor_id, transaction_type='purchase', transaction_date=transaction_data.transaction_date, transaction_notes=transaction_data.transaction_notes ) db.add(transaction) db.flush() total_amount = 0 physical_items = [] case_service = self.get_service("case") box_service = self.get_service("box") for item in transaction_data.items: # Create the physical item based on type # TODO: remove is_case and num_boxes, should derive from product_id # TODO: add support for purchasing single cards if item.item_type == "case": for i in range(item.quantity): physical_item = await case_service.create_case( db=db, product_id=item.product_id, cost_basis=item.unit_price, num_boxes=item.num_boxes ) physical_items.append(physical_item) elif item.item_type == "box": for i in range(item.quantity): physical_item = await box_service.create_box( db=db, product_id=item.product_id, cost_basis=item.unit_price ) physical_items.append(physical_item) else: raise ValueError(f"Invalid item type: {item.item_type}") # TODO: add support for purchasing single cards for physical_item in physical_items: # Create transaction item transaction.transaction_items.append(TransactionItem( inventory_item_id=physical_item.inventory_item.id, unit_price=item.unit_price )) # Update transaction total transaction.transaction_total_amount = total_amount return transaction except Exception as e: raise e async def create_customer( self, db: Session, customer_name: str ) -> Customer: try: # check if customer already exists existing_customer = db.query(Customer).filter(Customer.name == customer_name).first() if existing_customer: return existing_customer with db_transaction(db): customer = Customer( name=customer_name ) db.add(customer) db.flush() return customer except Exception as e: raise e async def create_vendor( self, db: Session, vendor_name: str ) -> Vendor: try: # check if vendor already exists existing_vendor = db.query(Vendor).filter(Vendor.name == vendor_name).first() if existing_vendor: return existing_vendor with db_transaction(db): vendor = Vendor( name=vendor_name ) db.add(vendor) db.flush() return vendor except Exception as e: raise e async def get_vendors( self, db: Session ) -> List[Vendor]: return db.query(Vendor).all() async def create_marketplace( self, db: Session, marketplace_name: str ) -> Marketplace: try: # check if marketplace already exists existing_marketplace = db.query(Marketplace).filter(Marketplace.name == marketplace_name).first() if existing_marketplace: return existing_marketplace with db_transaction(db): marketplace = Marketplace( name=marketplace_name ) db.add(marketplace) db.flush() return marketplace except Exception as e: raise e async def get_marketplaces( self, db: Session ) -> List[Marketplace]: return db.query(Marketplace).all() class BoxService(BaseService[Box]): def __init__(self): super().__init__(Box) async def create_box( self, db: Session, product_id: int, cost_basis: float ) -> Box: try: with db_transaction(db): # Create the SealedBox box = Box( tcgplayer_product_id=product_id ) db.add(box) db.flush() # Get the ID for relationships expected_value = box.products.sealed_expected_value.expected_value box.expected_value = expected_value db.flush() # Create the InventoryItem for the sealed box inventory_item = InventoryItem( physical_item=box, cost_basis=cost_basis ) db.add(inventory_item) return box except Exception as e: raise e async def calculate_cost_basis_for_opened_cards(self, db: Session, open_event: OpenEvent) -> float: box_cost_basis = open_event.source_item.inventory_item.cost_basis box_expected_value = open_event.source_item.products.sealed_expected_value.expected_value for resulting_card in open_event.resulting_items: # ensure card if resulting_card.item_type != "card": raise ValueError(f"Expected card, got {resulting_card.item_type}") resulting_card_market_value = resulting_card.products.most_recent_tcgplayer_price.market_price resulting_card_cost_basis = (resulting_card_market_value / box_expected_value) * box_cost_basis resulting_card.inventory_item.cost_basis = resulting_card_cost_basis db.flush() async def open_box(self, db: Session, box: Box, manabox_file_uploads: List[FileInDB]) -> bool: with db_transaction(db): # create open event open_event = OpenEvent( source_item=box, open_date=datetime.now() ) db.add(open_event) db.flush() manabox_upload_ids = [manabox_file_upload.id for manabox_file_upload in manabox_file_uploads] staging_data = db.query(ManaboxImportStaging).filter(ManaboxImportStaging.file_id.in_(manabox_upload_ids)).all() for record in staging_data: for i in range(record.quantity): open_card = Card( tcgplayer_product_id=record.tcgplayer_product_id, tcgplayer_sku_id=record.tcgplayer_sku_id ) open_event.resulting_items.append(open_card) inventory_item = InventoryItem( physical_item=open_card, cost_basis=0 ) db.add(inventory_item) db.flush() # calculate cost basis for opened cards await self.calculate_cost_basis_for_opened_cards(db, open_event) return open_event class CaseService(BaseService[Case]): def __init__(self): super().__init__(Case) async def create_case(self, db: Session, product_id: int, cost_basis: float, num_boxes: int) -> Case: try: with db_transaction(db): # Create the SealedCase case = Case( tcgplayer_product_id=product_id, num_boxes=num_boxes ) db.add(case) db.flush() # Get the ID for relationships case.expected_value = case.products.sealed_expected_value.expected_value # Create the InventoryItem for the sealed case inventory_item = InventoryItem( physical_item=case, cost_basis=cost_basis ) db.add(inventory_item) return case except Exception as e: raise e async def open_case(self, db: Session, case: Case, child_product_id: int) -> bool: try: ## TODO should be able to import a manabox file with a case ## cost basis will be able to flow down to the card accurately with db_transaction(db): # Create the OpenEvent open_event = OpenEvent( source_item=case, open_date=datetime.now() ) db.add(open_event) db.flush() # Get the ID for relationships # Create num_boxes SealedBoxes for i in range(case.num_boxes): new_box = Box( tcgplayer_product_id=child_product_id ) open_event.resulting_items.append(new_box) db.flush() per_box_cost_basis = case.inventory_item.cost_basis / case.num_boxes # Create the InventoryItem for the sealed box inventory_item = InventoryItem( physical_item=new_box, cost_basis=per_box_cost_basis ) db.add(inventory_item) return True except Exception as e: raise e class MarketplaceListingService(BaseService[MarketplaceListing]): def __init__(self): super().__init__(MarketplaceListing) self.pricing_service = self.service_manager.get_service("pricing") async def create_marketplace_listing(self, db: Session, inventory_item: InventoryItem, marketplace: Marketplace) -> MarketplaceListing: try: with db_transaction(db): recommended_price = await self.pricing_service.set_price(db, inventory_item) logger.info(f"recommended_price: {recommended_price.price}") marketplace_listing = MarketplaceListing( inventory_item=inventory_item, marketplace=marketplace, recommended_price=recommended_price, listing_date=None, delisting_date=None ) db.add(marketplace_listing) db.flush() return marketplace_listing except Exception as e: raise e async def update_marketplace_listing_price(self, db: Session, marketplace_listing: MarketplaceListing) -> MarketplaceListing: try: with db_transaction(db): marketplace_listing.listed_price = self.pricing_service.set_price(marketplace_listing.inventory_item) db.flush() return marketplace_listing except Exception as e: raise e async def get_marketplace_listing(self, db: Session, inventory_item: InventoryItem, marketplace: Marketplace) -> MarketplaceListing: return db.query(MarketplaceListing).filter(MarketplaceListing.inventory_item == inventory_item).filter(MarketplaceListing.delisting_date == None).filter(MarketplaceListing.deleted_at == None).filter(MarketplaceListing.marketplace == marketplace).order_by(MarketplaceListing.created_at.desc()).first() async def get_active_marketplace_listing(self, db: Session, inventory_item: InventoryItem, marketplace: Marketplace) -> MarketplaceListing: return db.query(MarketplaceListing).filter(MarketplaceListing.inventory_item == inventory_item).filter(MarketplaceListing.delisting_date == None).filter(MarketplaceListing.deleted_at == None).filter(MarketplaceListing.marketplace == marketplace).filter(MarketplaceListing.listing_date != None).order_by(MarketplaceListing.created_at.desc()).first() async def confirm_listings(self, db: Session, open_event: OpenEvent, marketplace: Marketplace) -> str: tcgplayer_add_file = await self.create_tcgplayer_add_file(db, open_event, marketplace) if not tcgplayer_add_file: raise ValueError("No TCGplayer add file created") with db_transaction(db): for resulting_item in open_event.resulting_items: marketplace_listing = await self.get_marketplace_listing(db, resulting_item.inventory_item, marketplace) if marketplace_listing is None: raise ValueError(f"No active marketplace listing found for inventory item id {resulting_item.inventory_item.id} in {marketplace.name}") marketplace_listing.listing_date = datetime.now() db.flush() return tcgplayer_add_file async def create_tcgplayer_add_file(self, db: Session, open_event: OpenEvent, marketplace: Marketplace) -> str: # TCGplayer Id,Product Line,Set Name,Product Name,Title,Number,Rarity,Condition,TCG Market Price,TCG Direct Low,TCG Low Price With Shipping,TCG Low Price,Total Quantity,Add to Quantity,TCG Marketplace Price,Photo URL headers = [ "TCGplayer Id", "Product Line", "Set Name", "Product Name", "Title", "Number", "Rarity", "Condition", "TCG Market Price", "TCG Direct Low", "TCG Low Price With Shipping", "TCG Low Price", "Total Quantity", "Add to Quantity", "TCG Marketplace Price", "Photo URL" ] data = {} for resulting_item in open_event.resulting_items: marketplace_listing = await self.get_marketplace_listing(db, resulting_item.inventory_item, marketplace) if marketplace_listing is None: raise ValueError(f"No active marketplace listing found for inventory item id {resulting_item.inventory_item.id} in {marketplace.name}") tcgplayer_sku_id = resulting_item.tcgplayer_sku_id if tcgplayer_sku_id in data: data[tcgplayer_sku_id]["Add to Quantity"] += 1 continue product_line = resulting_item.products.category.name set_name = resulting_item.products.group.name product_name = resulting_item.products.name title = "" number = resulting_item.products.ext_number rarity = resulting_item.products.ext_rarity condition = " ".join([condition.capitalize() for condition in resulting_item.sku.condition.split(" ")]) + (" " + resulting_item.products.sub_type_name if resulting_item.products.sub_type_name == "Foil" else "") tcg_market_price = resulting_item.products.most_recent_tcgplayer_price.market_price tcg_direct_low = resulting_item.products.most_recent_tcgplayer_price.direct_low_price tcg_low_price_with_shipping = resulting_item.products.most_recent_tcgplayer_price.low_price tcg_low_price = resulting_item.products.most_recent_tcgplayer_price.low_price total_quantity = "" add_to_quantity = 1 # get average recommended price of product # get inventory items with same tcgplayer_product_id inventory_items = db.query(InventoryItem).filter(InventoryItem.physical_item.has(tcgplayer_sku_id=tcgplayer_sku_id)).all() inventory_item_ids = [inventory_item.id for inventory_item in inventory_items] logger.debug(f"inventory_item_ids: {inventory_item_ids}") valid_listings = db.query(MarketplaceListing).filter(MarketplaceListing.inventory_item_id.in_(inventory_item_ids)).filter(MarketplaceListing.delisting_date == None).filter(MarketplaceListing.deleted_at == None).filter(MarketplaceListing.listing_date == None).all() logger.debug(f"valid_listings: {valid_listings}") avg_recommended_price = sum([listing.recommended_price.price for listing in valid_listings]) / len(valid_listings) data[tcgplayer_sku_id] = { "TCGplayer Id": tcgplayer_sku_id, "Product Line": product_line, "Set Name": set_name, "Product Name": product_name, "Title": title, "Number": number, "Rarity": rarity, "Condition": condition, "TCG Market Price": tcg_market_price, "TCG Direct Low": tcg_direct_low, "TCG Low Price With Shipping": tcg_low_price_with_shipping, "TCG Low Price": tcg_low_price, "Total Quantity": total_quantity, "Add to Quantity": add_to_quantity, "TCG Marketplace Price": f"{Decimal(avg_recommended_price):.2f}", "Photo URL": "" } # format data into csv # header header_row = ",".join(headers) # data def escape_csv_value(value): if value is None: return "" value = str(value) if any(c in value for c in [',', '"', '\n']): return f'"{value.replace('"', '""')}"' return value data_rows = [",".join([escape_csv_value(data[tcgplayer_id][header]) for header in headers]) for tcgplayer_id in data] csv_data = "\n".join([header_row] + data_rows) return csv_data