import logging from dataclasses import dataclass from typing import Optional, Dict, List from sqlalchemy.orm import Session from sqlalchemy import select from app.services.base_service import BaseService from app.models.inventory_management import InventoryItem, MarketplaceListing, PhysicalItem from app.models.tcgplayer_inventory import TCGPlayerInventory from app.models.tcgplayer_products import TCGPlayerPriceHistory, TCGPlayerProduct, MTGJSONSKU from app.models.pricing import PricingEvent from app.db.database import transaction from decimal import Decimal from datetime import datetime logger = logging.getLogger(__name__) @dataclass class PriceData: cost_basis: Optional[Decimal] market_price: Optional[Decimal] tcg_low: Optional[Decimal] tcg_mid: Optional[Decimal] direct_low: Optional[Decimal] listed_price: Optional[Decimal] quantity: int lowest_price_for_qty: Optional[Decimal] velocity: Optional[Decimal] age_of_inventory: Optional[int] class PricingService(BaseService): def __init__(self): super().__init__(None) async def get_unmanaged_inventory(self, db: Session): unmanaged_inventory = db.query(TCGPlayerInventory).filter( TCGPlayerInventory.tcgplayer_sku_id.notin_( db.query(PhysicalItem.tcgplayer_sku_id).join( InventoryItem, PhysicalItem.id == InventoryItem.physical_item_id ).join( MarketplaceListing, InventoryItem.id == MarketplaceListing.inventory_item_id ).filter( MarketplaceListing.delisting_date.is_(None), MarketplaceListing.deleted_at.is_(None), InventoryItem.deleted_at.is_(None), PhysicalItem.deleted_at.is_(None) ) ), TCGPlayerInventory.total_quantity >= 1 ).all() return unmanaged_inventory async def get_managed_inventory(self, db: Session): # First get the TCGPlayerInventory IDs that are managed managed_ids = select(TCGPlayerInventory.id).join( PhysicalItem, TCGPlayerInventory.tcgplayer_sku_id == PhysicalItem.tcgplayer_sku_id ).join( InventoryItem, PhysicalItem.id == InventoryItem.physical_item_id ).join( MarketplaceListing, InventoryItem.id == MarketplaceListing.inventory_item_id ).filter( MarketplaceListing.delisting_date.is_(None), MarketplaceListing.deleted_at.is_(None), InventoryItem.deleted_at.is_(None), PhysicalItem.deleted_at.is_(None) ) # Then get just the TCGPlayerInventory data for those IDs managed_inventory = db.query(TCGPlayerInventory).filter( TCGPlayerInventory.id.in_(managed_ids) ).all() return managed_inventory async def get_pricing_data_for_unmanaged_inventory(self, db: Session) -> Dict[int, PriceData]: """Gather all pricing data for unmanaged inventory in a single query.""" unmanaged_inventory = await self.get_unmanaged_inventory(db) # Get all SKU IDs sku_ids = [inv.tcgplayer_sku_id for inv in unmanaged_inventory] # Fetch all MTGJSON SKUs and their products in one query mtgjson_skus = db.query(MTGJSONSKU).filter( MTGJSONSKU.tcgplayer_sku_id.in_(sku_ids) ).all() # Create a mapping of SKU ID to MTGJSON SKU sku_map = {sku.tcgplayer_sku_id: sku for sku in mtgjson_skus} # Create price data for each inventory item price_data_map = {} for inventory in unmanaged_inventory: mtgjson_sku = sku_map.get(inventory.tcgplayer_sku_id) if mtgjson_sku and mtgjson_sku.product and mtgjson_sku.product.most_recent_tcgplayer_price: recent_price = mtgjson_sku.product.most_recent_tcgplayer_price price_data = PriceData( cost_basis=None, market_price=Decimal(str(recent_price.market_price)) if recent_price.market_price else None, tcg_low=Decimal(str(recent_price.low_price)) if recent_price.low_price else None, tcg_mid=Decimal(str(recent_price.mid_price)) if recent_price.mid_price else None, direct_low=Decimal(str(recent_price.direct_low_price)) if recent_price.direct_low_price else None, listed_price=Decimal(str(inventory.tcg_marketplace_price)) if inventory.tcg_marketplace_price else None, quantity=inventory.total_quantity, lowest_price_for_qty=None, velocity=None, age_of_inventory=None ) price_data_map[inventory.tcgplayer_sku_id] = price_data return price_data_map async def update_prices_for_unmanaged_inventory(self, db: Session): # Get all pricing data upfront price_data_map = await self.get_pricing_data_for_unmanaged_inventory(db) # Update prices using the pre-fetched data unmanaged_inventory = await self.get_unmanaged_inventory(db) for inventory in unmanaged_inventory: price_data = price_data_map.get(inventory.tcgplayer_sku_id) if price_data: inventory.tcg_marketplace_price = await self.set_price(db, price_data) return unmanaged_inventory async def update_prices_for_managed_inventory(self, db: Session): """Update prices for managed inventory items and return updated TCGPlayerInventory data.""" managed_inventory = await self.get_managed_inventory(db) # Get all the inventory items we need in one query inventory_items = db.query(InventoryItem).join( PhysicalItem, InventoryItem.physical_item_id == PhysicalItem.id ).filter( PhysicalItem.tcgplayer_sku_id.in_([inv.tcgplayer_sku_id for inv in managed_inventory]), InventoryItem.deleted_at.is_(None) ).all() # Create a map of sku_id to inventory_item for easy lookup inventory_map = {item.physical_item.tcgplayer_sku_id: item for item in inventory_items} for tcg_inventory in managed_inventory: inventory_item = inventory_map.get(tcg_inventory.tcgplayer_sku_id) if inventory_item: pricing_event = await self.set_price_for_inventory_item(db, inventory_item) if pricing_event: tcg_inventory.tcg_marketplace_price = pricing_event.price return managed_inventory async def set_price_for_inventory_item(self, db: Session, inventory_item: InventoryItem): recent_price = inventory_item.physical_item.sku.product.most_recent_tcgplayer_price # Get the most recent active marketplace listing active_listing = None if inventory_item.marketplace_listing: active_listings = [listing for listing in inventory_item.marketplace_listing if listing.delisting_date is None and listing.deleted_at is None] if active_listings: active_listing = active_listings[0] # Get the first active listing price_data = PriceData( cost_basis=Decimal(str(inventory_item.cost_basis)) if inventory_item.cost_basis is not None else None, market_price=Decimal(str(recent_price.market_price)) if recent_price and recent_price.market_price is not None else None, tcg_low=Decimal(str(recent_price.low_price)) if recent_price and recent_price.low_price is not None else None, tcg_mid=Decimal(str(recent_price.mid_price)) if recent_price and recent_price.mid_price is not None else None, direct_low=Decimal(str(recent_price.direct_low_price)) if recent_price and recent_price.direct_low_price is not None else None, listed_price=Decimal(str(active_listing.listed_price.price)) if active_listing and active_listing.listed_price and active_listing.listed_price.price is not None else None, quantity=db.query(TCGPlayerInventory).filter( TCGPlayerInventory.tcgplayer_sku_id == inventory_item.physical_item.tcgplayer_sku_id ).first().total_quantity if db.query(TCGPlayerInventory).filter( TCGPlayerInventory.tcgplayer_sku_id == inventory_item.physical_item.tcgplayer_sku_id ).first() else 0, lowest_price_for_qty=None, velocity=None, age_of_inventory=None ) return await self.set_price(db, price_data, inventory_item) async def set_price(self, db: Session, price_data: PriceData, inventory_item: InventoryItem=None): """ TODO This sets listed_price per inventory_item but listed_price can only be applied to a product however, this may be desired on other marketplaces when generating pricing file for tcgplayer, give the option to set min, max, avg price for product? """ # Fetch base pricing data cost_basis = price_data.cost_basis market_price = price_data.market_price tcg_low = price_data.tcg_low tcg_mid = price_data.tcg_mid listed_price = price_data.listed_price if inventory_item: # average cost basis for all inventory items with the same tcgplayer_sku_id average_cost_basis = db.query(InventoryItem.cost_basis).filter( InventoryItem.physical_item_id == inventory_item.physical_item_id ).all() cost_basis_values = [row[0] for row in average_cost_basis if row[0] is not None] if cost_basis_values: cost_basis = Decimal(str(sum(cost_basis_values) / len(cost_basis_values))) logger.info(f"listed_price: {listed_price}") logger.info(f"market_price: {market_price}") logger.info(f"tcg_low: {tcg_low}") logger.info(f"tcg_mid: {tcg_mid}") logger.info(f"cost_basis: {cost_basis}") # TODO: Add logic to fetch lowest price for seller with same quantity in stock # NOT IMPLEMENTED YET lowest_price_for_quantity = Decimal('0.0') # Hardcoded configuration values (should be parameterized later) shipping_cost = Decimal('1.0') tcgplayer_shipping_fee = Decimal('1.31') average_cards_per_order = Decimal('3.0') marketplace_fee_percentage = Decimal('0.20') target_margin = Decimal('0.10') velocity_multiplier = Decimal('0.0') global_margin_multiplier = Decimal('0.00') min_floor_price = Decimal('0.25') price_drop_threshold = Decimal('0.50') # TODO add age of inventory price decrease multiplier age_of_inventory_multiplier = Decimal('0.0') # card cost margin multiplier if market_price > 0 and market_price < 2: card_cost_margin_multiplier = Decimal('-0.033') elif market_price >= 2 and market_price < 10: card_cost_margin_multiplier = Decimal('0.0') elif market_price >= 10 and market_price < 30: card_cost_margin_multiplier = Decimal('0.0125') elif market_price >= 30 and market_price < 50: card_cost_margin_multiplier = Decimal('0.025') elif market_price >= 50 and market_price < 100: card_cost_margin_multiplier = Decimal('0.033') elif market_price >= 100 and market_price < 200: card_cost_margin_multiplier = Decimal('0.05') else: card_cost_margin_multiplier = Decimal('0.0') # Fetch current total quantity in stock for SKU quantity_in_stock = price_data.quantity # Determine quantity multiplier based on stock levels if quantity_in_stock < 4: quantity_multiplier = Decimal('0.0') elif quantity_in_stock == 4: quantity_multiplier = Decimal('0.2') elif 5 <= quantity_in_stock < 10: quantity_multiplier = Decimal('0.3') elif quantity_in_stock >= 10: quantity_multiplier = Decimal('0.4') else: quantity_multiplier = Decimal('0.0') # Calculate adjusted target margin from base and global multipliers adjusted_target_margin = target_margin + global_margin_multiplier + card_cost_margin_multiplier # limit shipping cost offset to 10% of market price shipping_cost_offset = min(shipping_cost / average_cards_per_order, market_price * Decimal('0.1')) if cost_basis is None: cost_basis = tcg_mid * Decimal('0.65') # Calculate base price considering cost, shipping, fees, and margin targets base_price = (cost_basis + shipping_cost_offset) / ( (Decimal('1.0') - marketplace_fee_percentage) - adjusted_target_margin ) # Adjust base price by quantity and velocity multipliers, limit markup to amount of shipping fee adjusted_price = min( base_price * (Decimal('1.0') + quantity_multiplier + velocity_multiplier - age_of_inventory_multiplier), base_price + tcgplayer_shipping_fee ) # Adjust price based on market prices (TCG low and TCG mid) if adjusted_price < tcg_low: adjusted_price = tcg_mid price_used = "tcg mid" price_reason = "adjusted price below tcg low" elif adjusted_price > tcg_low and adjusted_price < (tcg_mid * Decimal('0.85')): adjusted_price = tcg_mid price_used = "tcg mid" price_reason = f"adjusted price below 80% of tcg mid" elif adjusted_price > (tcg_mid * Decimal('1.1')): adjusted_price = max(tcg_mid, cost_basis) price_used = "max tcg mid/cost basis" price_reason = f"adjusted price above 110% of tcg mid, using max of tcg mid and cost basis" else: price_used = "adjusted price" price_reason = "valid price assigned based on margin targets" # TODO: Add logic to adjust price to beat competitor price with same quantity # NOT IMPLEMENTED YET if adjusted_price < lowest_price_for_quantity: adjusted_price = lowest_price_for_quantity - Decimal('0.01') price_used = "lowest price for quantity" price_reason = "adjusted price below lowest price for quantity" # Fine-tune price to optimize for free shipping promotions free_shipping_adjustment = False for x in range(1, 5): quantity = Decimal(str(x)) if Decimal('5.00') <= adjusted_price * quantity <= Decimal('5.15'): adjusted_price = Decimal('4.99') / quantity free_shipping_adjustment = True break # prevent price drop over price drop threshold if listed_price and adjusted_price < (listed_price * (1 - price_drop_threshold)): adjusted_price = listed_price price_used = "listed price" price_reason = "adjusted price below price drop threshold" # Enforce minimum floor price if adjusted_price < min_floor_price: adjusted_price = min_floor_price price_used = "min floor price" price_reason = "adjusted price below min floor price" # Record pricing event in database transaction if inventory_item: with transaction(db): pricing_event = PricingEvent( inventory_item_id=inventory_item.id, price=float(adjusted_price), price_used=price_used, price_reason=price_reason, free_shipping_adjustment=free_shipping_adjustment ) db.add(pricing_event) # delete previous pricing events for inventory item if inventory_item.marketplace_listing: for listing in inventory_item.marketplace_listing: if listing.listed_price: listing.listed_price.deleted_at = datetime.now() db.flush() listing.listed_price = pricing_event return pricing_event else: return adjusted_price # BAD BAD BAD FIX PLS TODO