from dataclasses import dataclass from decimal import Decimal, ROUND_HALF_UP from enum import Enum from typing import Optional, Dict, List, Any import pandas as pd import logging from db.models import Product, Price from sqlalchemy.orm import Session from uuid import uuid4 as uuid from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from sqlalchemy import text from services.util._dataframe import DataframeUtil logger = logging.getLogger(__name__) class PriceType(str, Enum): TCG_MARKET = 'tcg_market_price' TCG_DIRECT_LOW = 'tcg_direct_low' TCG_LOW_WITH_SHIPPING = 'tcg_low_price_with_shipping' TCG_LOW = 'tcg_low_price' TCG_MARKETPLACE = 'tcg_marketplace_price' MY_PRICE = 'my_price' class PricingStrategy(str, Enum): DEFAULT = 'default' AGGRESSIVE = 'aggressive' CONSERVATIVE = 'conservative' @dataclass class PriceRange: min_price: Decimal max_price: Decimal multiplier: Decimal ceiling_price: Optional[Decimal] = None include_shipping: bool = False def __post_init__(self): # Convert all values to Decimal for precise calculations self.min_price = Decimal(str(self.min_price)) self.max_price = Decimal(str(self.max_price)) self.multiplier = Decimal(str(self.multiplier)) if self.ceiling_price is not None: self.ceiling_price = Decimal(str(self.ceiling_price)) def contains_price(self, price: Decimal) -> bool: """Check if a price falls within this range, inclusive of min, exclusive of max.""" return self.min_price <= price < self.max_price def calculate_price(self, base_price: Decimal) -> Decimal: """Calculate the final price for this range, respecting ceiling.""" calculated = base_price * self.multiplier if self.ceiling_price is not None: calculated = min(calculated, self.ceiling_price) return calculated.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP) class PricingConfiguration: """Centralized configuration for pricing rules and thresholds.""" # Price thresholds FLOOR_PRICE = Decimal('0.35') MAX_PRICE = Decimal('100000.00') # Safety cap for maximum price SHIPPING_THRESHOLD = Decimal('5.00') # Multipliers FLOOR_MULT = Decimal('1.25') NEAR_FLOOR_MULT = Decimal('1.25') UNDER_FIVE_MULT = Decimal('1.25') FIVE_TO_TEN_MULT = Decimal('1.15') TEN_TO_TWENTYFIVE_MULT = Decimal('1.10') TWENTYFIVE_TO_FIFTY_MULT = Decimal('1.05') FIFTY_PLUS_MULT = Decimal('1.025') # Price variance thresholds MAX_PRICE_VARIANCE = Decimal('0.50') # Maximum allowed variance between prices as a ratio @classmethod def get_price_ranges(cls) -> list[PriceRange]: """Get the list of price ranges with their respective rules.""" return [ PriceRange( min_price=Decimal('0'), max_price=cls.FLOOR_PRICE, multiplier=cls.FLOOR_MULT, include_shipping=False ), PriceRange( min_price=cls.FLOOR_PRICE, max_price=Decimal('5'), multiplier=cls.UNDER_FIVE_MULT, ceiling_price=Decimal('4.99'), include_shipping=False ), PriceRange( min_price=Decimal('5'), max_price=Decimal('10'), multiplier=cls.FIVE_TO_TEN_MULT, ceiling_price=Decimal('9.99'), include_shipping=True ), PriceRange( min_price=Decimal('10'), max_price=Decimal('25'), multiplier=cls.TEN_TO_TWENTYFIVE_MULT, ceiling_price=Decimal('24.99'), include_shipping=True ), PriceRange( min_price=Decimal('25'), max_price=Decimal('50'), multiplier=cls.TWENTYFIVE_TO_FIFTY_MULT, ceiling_price=Decimal('49.99'), include_shipping=True ), PriceRange( min_price=Decimal('50'), max_price=cls.MAX_PRICE, multiplier=cls.FIFTY_PLUS_MULT, include_shipping=True ) ] class PriceCalculationResult: """Represents the result of a price calculation.""" def __init__( self, product: Product, calculated_price: Optional[Decimal], base_prices: Dict[str, Decimal], error: Optional[str] = None ): self.product = product self.calculated_price = calculated_price self.base_prices = base_prices self.error = error @property def success(self) -> bool: return self.calculated_price is not None and self.error is None @property def max_base_price(self) -> Optional[Decimal]: """Returns the highest base price.""" return max(self.base_prices.values()) if self.base_prices else None class PricingService: CHUNK_SIZE = 5000 # Configurable batch size MAX_WORKERS = 4 # Configurable worker count def __init__(self, db: Session): self.db = db self.df_util = DataframeUtil() self.config = PricingConfiguration self.price_ranges = self.config.get_price_ranges() def get_product_by_id(self, product_id: str) -> Optional[Product]: """Get a product by its ID.""" return self.db.query(Product)\ .filter(Product.id == str(product_id))\ .all()[0] if len(self.db.query(Product)\ .filter(Product.id == str(product_id))\ .all()) > 0 else None def get_latest_price_for_product(self, product: Product, price_type: PriceType) -> Optional[Price]: """Get the most recent price of a specific type for a product.""" prices = self.db.query(Price)\ .filter( Price.product_id == str(product.id), Price.type == price_type.value )\ .order_by(Price.date_created.desc())\ .all() return prices[0] if prices else None def get_historical_prices_for_product( self, product: Product, price_type: Optional[PriceType] = None ) -> dict[PriceType, list[Price]]: """Get historical prices for a product, optionally filtered by type.""" query = self.db.query(Price).filter(Price.product_id == str(product.id)) if price_type: query = query.filter(Price.type == price_type.value) # Fixed: Use enum value prices = query.order_by(Price.date_created.desc()).all() if price_type: return {price_type: prices} # Group prices by type result = {t: [] for t in PriceType} for price in prices: result[PriceType(price.type)].append(price) # Fixed: Convert string to enum return result def _validate_price_data(self, prices: dict[str, Optional[Price]]) -> Optional[str]: """Validate price data and return error message if invalid.""" # Filter out None values and get valid prices valid_prices = {k: v for k, v in prices.items() if v is not None} if not valid_prices: return "No valid price data available" for price in valid_prices.values(): if price.price < 0: return f"Negative price found: {price.price}" if price.price > self.config.MAX_PRICE: return f"Price exceeds maximum allowed: {price.price}" return None def _check_price_variance(self, prices: Dict[str, Decimal]) -> bool: """Check if the variance between prices is within acceptable limits.""" if not prices: return True min_price = min(prices.values()) max_price = max(prices.values()) if min_price == 0: return False variance_ratio = max_price / min_price return variance_ratio <= (1 + self.config.MAX_PRICE_VARIANCE) def _get_relevant_prices(self, product: Product) -> dict[str, Optional[Price]]: """Get all relevant prices for a product.""" return { PriceType.TCG_LOW.value: self.get_latest_price_for_product(product, PriceType.TCG_LOW), PriceType.TCG_DIRECT_LOW.value: self.get_latest_price_for_product(product, PriceType.TCG_DIRECT_LOW), PriceType.TCG_MARKET.value: self.get_latest_price_for_product(product, PriceType.TCG_MARKET), PriceType.TCG_LOW_WITH_SHIPPING.value: self.get_latest_price_for_product(product, PriceType.TCG_LOW_WITH_SHIPPING) } def _get_base_prices( self, prices: dict[str, Price], include_shipping: bool = False ) -> Dict[str, Decimal]: """Get base prices, excluding None values.""" base_prices = {} # Add core prices if they exist if tcg_low := prices.get(PriceType.TCG_LOW.value): base_prices[PriceType.TCG_LOW.value] = Decimal(str(tcg_low.price)) if tcg_direct := prices.get(PriceType.TCG_DIRECT_LOW.value): base_prices[PriceType.TCG_DIRECT_LOW.value] = Decimal(str(tcg_direct.price)) if tcg_market := prices.get(PriceType.TCG_MARKET.value): base_prices[PriceType.TCG_MARKET.value] = Decimal(str(tcg_market.price)) # Add shipping price if requested and available if include_shipping: if tcg_shipping := prices.get(PriceType.TCG_LOW_WITH_SHIPPING.value): base_prices[PriceType.TCG_LOW_WITH_SHIPPING.value] = Decimal(str(tcg_shipping.price)) return base_prices def _get_price_range(self, price: Decimal) -> Optional[PriceRange]: """Get the appropriate price range for a given price.""" for price_range in self.price_ranges: if price_range.contains_price(price): return price_range return None def _handle_floor_price_cases( self, base_prices: Dict[str, Decimal] ) -> Optional[Decimal]: """Handle special cases for prices near or below floor price.""" if all(price < self.config.FLOOR_PRICE for price in base_prices.values()): return self.config.FLOOR_PRICE if any(price < self.config.FLOOR_PRICE for price in base_prices.values()): max_price = max(base_prices.values()) return max_price * self.config.NEAR_FLOOR_MULT return None def calculate_price( self, product_id: str, strategy: PricingStrategy = PricingStrategy.DEFAULT ) -> PriceCalculationResult: """Calculate the final price for a product using the specified pricing strategy.""" # get product product = self.get_product_by_id(str(product_id)) # Fixed: Ensure string UUID if not product: logger.error(f"Product not found: {product_id}") return PriceCalculationResult(product, None, {}, "Product not found") # Get all relevant prices prices = self._get_relevant_prices(product) # Validate price data if error := self._validate_price_data(prices): logger.error(f"Invalid price data: {error}") logger.error(f"product: {product.id}") return PriceCalculationResult(product, None, {}, error) # Get initial base prices without shipping base_prices = self._get_base_prices(prices, include_shipping=False) # Check price variance if not self._check_price_variance(base_prices): logger.error(f"Price variance exceeds acceptable threshold") logger.error(f"Base prices: {base_prices}") logger.error(f"product: {product.id}") return PriceCalculationResult( product, None, base_prices, "Price variance exceeds acceptable threshold" ) # Handle floor price cases if floor_price := self._handle_floor_price_cases(base_prices): return PriceCalculationResult(product, floor_price, base_prices) # Get max base price and its range max_base_price = max(base_prices.values()) price_range = self._get_price_range(max_base_price) if not price_range: logger.error(f"No valid price range found for price") logger.error(f"Base prices: {base_prices}, max_base_price: {max_base_price}") logger.error(f"product: {product.id}") return PriceCalculationResult( product, None, base_prices, f"No valid price range found for price: {max_base_price}" ) # Include shipping prices if necessary if price_range.include_shipping: base_prices = self._get_base_prices(prices, include_shipping=True) max_base_price = max(base_prices.values()) # Recheck price range with shipping price_range = self._get_price_range(max_base_price) if not price_range: logger.error(f"No valid price range found for price with shipping") logger.error(f"Base prices: {base_prices}, max_base_price: {max_base_price}") logger.error(f"product: {product.id}") return PriceCalculationResult( product, None, base_prices, f"No valid price range found for price with shipping: {max_base_price}" ) # Calculate final price using the price range calculated_price = price_range.calculate_price(max_base_price) # Apply strategy-specific adjustments if strategy == PricingStrategy.AGGRESSIVE: calculated_price *= Decimal('0.95') elif strategy == PricingStrategy.CONSERVATIVE: calculated_price *= Decimal('1.05') debug_base_prices_with_name_string = ", ".join([f"{k}: {v}" for k, v in base_prices.items()]) logger.debug(f"Set price for to {calculated_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)} based on {debug_base_prices_with_name_string}") return PriceCalculationResult( product, calculated_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP), base_prices ) def _bulk_generate_uuids(self, size: int) -> List[str]: """Generate UUIDs in bulk for better performance.""" return [str(uuid()) for _ in range(size)] def _prepare_price_records(self, df: pd.DataFrame, price_type: str, uuids: List[str]) -> List[Dict]: """Prepare price records in bulk using vectorized operations.""" records = [] df['price_id'] = uuids[:len(df)] df['type'] = price_type # price_type should already be a string value df['date_created'] = datetime.utcnow() return df[['price_id', 'product_id', 'type', 'price', 'date_created']].to_dict('records') def _calculate_suggested_prices_batch(self, product_ids: List[str]) -> Dict[str, float]: """Calculate suggested prices in parallel for a batch of products.""" with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor: future_to_id = { executor.submit(self.calculate_price, str(pid)): pid # Fixed: Ensure string UUID for pid in product_ids } results = {} for future in as_completed(future_to_id): product_id = future_to_id[future] try: result = future.result() if result.success: results[str(product_id)] = float(result.calculated_price) # Fixed: Ensure string UUID except Exception as e: logger.error(f"Failed to calculate price for product {product_id}: {e}") return results def _bulk_insert_prices(self, records: List[Dict]) -> None: """Efficiently insert price records in bulk.""" if not records: return try: df = pd.DataFrame(records) df.to_sql('prices', self.db.bind, if_exists='append', index=False, method='multi', chunksize=self.CHUNK_SIZE) except Exception as e: logger.error(f"Failed to bulk insert prices: {e}") raise def process_pricing_export(self, export_csv: bytes) -> None: """Process pricing export with optimized bulk operations.""" try: # Convert CSV to DataFrame df = self.df_util.csv_bytes_to_df(export_csv) df.columns = df.columns.str.lower().str.replace(' ', '_') # Get product mappings efficiently - SQLite compatible with chunking SQLITE_MAX_VARS = 999 # SQLite parameter limit tcgplayer_ids = df['tcgplayer_id'].tolist() all_product_dfs = [] for i in range(0, len(tcgplayer_ids), SQLITE_MAX_VARS): chunk_ids = tcgplayer_ids[i:i + SQLITE_MAX_VARS] placeholders = ','.join([':id_' + str(j) for j in range(len(chunk_ids))]) product_query = f""" SELECT tcgplayer_id, product_id FROM card_tcgplayer WHERE tcgplayer_id IN ({placeholders}) """ # Create a dictionary of parameters params = {f'id_{j}': id_val for j, id_val in enumerate(chunk_ids)} chunk_df = pd.read_sql( text(product_query), self.db.bind, params=params ) all_product_dfs.append(chunk_df) # Combine all chunks product_df = pd.concat(all_product_dfs) if all_product_dfs else pd.DataFrame() # Merge dataframes efficiently merged_df = pd.merge( df, product_df, on='tcgplayer_id', how='inner' ) # Define price columns mapping - using enum values directly price_columns = { 'tcg_market_price': PriceType.TCG_MARKET.value, 'tcg_direct_low': PriceType.TCG_DIRECT_LOW.value, 'tcg_low_price_with_shipping': PriceType.TCG_LOW_WITH_SHIPPING.value, 'tcg_low_price': PriceType.TCG_LOW.value, 'tcg_marketplace_price': PriceType.TCG_MARKETPLACE.value } # Process each price type in chunks for price_col, price_type in price_columns.items(): valid_prices_df = merged_df[merged_df[price_col].notna()].copy() for chunk_start in range(0, len(valid_prices_df), self.CHUNK_SIZE): chunk_df = valid_prices_df.iloc[chunk_start:chunk_start + self.CHUNK_SIZE].copy() uuids = self._bulk_generate_uuids(len(chunk_df)) chunk_df['price'] = chunk_df[price_col] chunk_df['product_id'] = chunk_df['product_id'].astype(str) # Fixed: Ensure string UUIDs records = self._prepare_price_records(chunk_df, price_type, uuids) self._bulk_insert_prices(records) # Handle suggested prices separately with parallel processing product_ids = merged_df['product_id'].unique() suggested_prices = {} for chunk_start in range(0, len(product_ids), self.CHUNK_SIZE): chunk_ids = product_ids[chunk_start:chunk_start + self.CHUNK_SIZE] chunk_prices = self._calculate_suggested_prices_batch(chunk_ids) suggested_prices.update(chunk_prices) # Create suggested price records if suggested_prices: suggested_df = pd.DataFrame([ {'product_id': str(pid), 'price': price} # Fixed: Ensure string UUIDs for pid, price in suggested_prices.items() ]) uuids = self._bulk_generate_uuids(len(suggested_df)) records = self._prepare_price_records(suggested_df, 'suggested_price', uuids) self._bulk_insert_prices(records) except Exception as e: logger.error(f"Failed to process pricing export: {e}") logger.error(f"Error occurred during price processing: {str(e)}") raise