500 lines
20 KiB
Python
500 lines
20 KiB
Python
from dataclasses import dataclass
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
from enum import Enum
|
|
from typing import Optional, Dict, List, Any
|
|
import pandas as pd
|
|
import logging
|
|
from db.models import Product, Price
|
|
from sqlalchemy.orm import Session
|
|
from uuid import uuid4 as uuid
|
|
from datetime import datetime
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from sqlalchemy import text
|
|
from services.util._dataframe import DataframeUtil
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class PriceType(str, Enum):
|
|
TCG_MARKET = 'tcg_market_price'
|
|
TCG_DIRECT_LOW = 'tcg_direct_low'
|
|
TCG_LOW_WITH_SHIPPING = 'tcg_low_price_with_shipping'
|
|
TCG_LOW = 'tcg_low_price'
|
|
TCG_MARKETPLACE = 'tcg_marketplace_price'
|
|
MY_PRICE = 'my_price'
|
|
|
|
class PricingStrategy(str, Enum):
|
|
DEFAULT = 'default'
|
|
AGGRESSIVE = 'aggressive'
|
|
CONSERVATIVE = 'conservative'
|
|
|
|
@dataclass
|
|
class PriceRange:
|
|
min_price: Decimal
|
|
max_price: Decimal
|
|
multiplier: Decimal
|
|
ceiling_price: Optional[Decimal] = None
|
|
include_shipping: bool = False
|
|
|
|
def __post_init__(self):
|
|
# Convert all values to Decimal for precise calculations
|
|
self.min_price = Decimal(str(self.min_price))
|
|
self.max_price = Decimal(str(self.max_price))
|
|
self.multiplier = Decimal(str(self.multiplier))
|
|
if self.ceiling_price is not None:
|
|
self.ceiling_price = Decimal(str(self.ceiling_price))
|
|
|
|
def contains_price(self, price: Decimal) -> bool:
|
|
"""Check if a price falls within this range, inclusive of min, exclusive of max."""
|
|
return self.min_price <= price < self.max_price
|
|
|
|
def calculate_price(self, base_price: Decimal) -> Decimal:
|
|
"""Calculate the final price for this range, respecting ceiling."""
|
|
calculated = base_price * self.multiplier
|
|
if self.ceiling_price is not None:
|
|
calculated = min(calculated, self.ceiling_price)
|
|
return calculated.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
|
|
|
|
class PricingConfiguration:
|
|
"""Centralized configuration for pricing rules and thresholds."""
|
|
|
|
# Price thresholds
|
|
FLOOR_PRICE = Decimal('0.35')
|
|
MAX_PRICE = Decimal('100000.00') # Safety cap for maximum price
|
|
SHIPPING_THRESHOLD = Decimal('5.00')
|
|
|
|
# Multipliers
|
|
FLOOR_MULT = Decimal('1.25')
|
|
NEAR_FLOOR_MULT = Decimal('1.25')
|
|
UNDER_FIVE_MULT = Decimal('1.25')
|
|
FIVE_TO_TEN_MULT = Decimal('1.15')
|
|
TEN_TO_TWENTYFIVE_MULT = Decimal('1.10')
|
|
TWENTYFIVE_TO_FIFTY_MULT = Decimal('1.05')
|
|
FIFTY_PLUS_MULT = Decimal('1.025')
|
|
|
|
# Price variance thresholds
|
|
MAX_PRICE_VARIANCE = Decimal('0.50') # Maximum allowed variance between prices as a ratio
|
|
|
|
@classmethod
|
|
def get_price_ranges(cls) -> list[PriceRange]:
|
|
"""Get the list of price ranges with their respective rules."""
|
|
return [
|
|
PriceRange(
|
|
min_price=Decimal('0'),
|
|
max_price=cls.FLOOR_PRICE,
|
|
multiplier=cls.FLOOR_MULT,
|
|
include_shipping=False
|
|
),
|
|
PriceRange(
|
|
min_price=cls.FLOOR_PRICE,
|
|
max_price=Decimal('5'),
|
|
multiplier=cls.UNDER_FIVE_MULT,
|
|
ceiling_price=Decimal('4.99'),
|
|
include_shipping=False
|
|
),
|
|
PriceRange(
|
|
min_price=Decimal('5'),
|
|
max_price=Decimal('10'),
|
|
multiplier=cls.FIVE_TO_TEN_MULT,
|
|
ceiling_price=Decimal('9.99'),
|
|
include_shipping=True
|
|
),
|
|
PriceRange(
|
|
min_price=Decimal('10'),
|
|
max_price=Decimal('25'),
|
|
multiplier=cls.TEN_TO_TWENTYFIVE_MULT,
|
|
ceiling_price=Decimal('24.99'),
|
|
include_shipping=True
|
|
),
|
|
PriceRange(
|
|
min_price=Decimal('25'),
|
|
max_price=Decimal('50'),
|
|
multiplier=cls.TWENTYFIVE_TO_FIFTY_MULT,
|
|
ceiling_price=Decimal('49.99'),
|
|
include_shipping=True
|
|
),
|
|
PriceRange(
|
|
min_price=Decimal('50'),
|
|
max_price=cls.MAX_PRICE,
|
|
multiplier=cls.FIFTY_PLUS_MULT,
|
|
include_shipping=True
|
|
)
|
|
]
|
|
|
|
class PriceCalculationResult:
|
|
"""Represents the result of a price calculation."""
|
|
|
|
def __init__(
|
|
self,
|
|
product: Product,
|
|
calculated_price: Optional[Decimal],
|
|
base_prices: Dict[str, Decimal],
|
|
error: Optional[str] = None
|
|
):
|
|
self.product = product
|
|
self.calculated_price = calculated_price
|
|
self.base_prices = base_prices
|
|
self.error = error
|
|
|
|
@property
|
|
def success(self) -> bool:
|
|
return self.calculated_price is not None and self.error is None
|
|
|
|
@property
|
|
def max_base_price(self) -> Optional[Decimal]:
|
|
"""Returns the highest base price."""
|
|
return max(self.base_prices.values()) if self.base_prices else None
|
|
|
|
|
|
class PricingService:
|
|
CHUNK_SIZE = 5000 # Configurable batch size
|
|
MAX_WORKERS = 4 # Configurable worker count
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.df_util = DataframeUtil()
|
|
self.config = PricingConfiguration
|
|
self.price_ranges = self.config.get_price_ranges()
|
|
|
|
def get_product_by_id(self, product_id: str) -> Optional[Product]:
|
|
"""Get a product by its ID."""
|
|
return self.db.query(Product)\
|
|
.filter(Product.id == str(product_id))\
|
|
.all()[0] if len(self.db.query(Product)\
|
|
.filter(Product.id == str(product_id))\
|
|
.all()) > 0 else None
|
|
|
|
def get_latest_price_for_product(self, product: Product, price_type: PriceType) -> Optional[Price]:
|
|
"""Get the most recent price of a specific type for a product."""
|
|
prices = self.db.query(Price)\
|
|
.filter(
|
|
Price.product_id == str(product.id),
|
|
Price.type == price_type.value
|
|
)\
|
|
.order_by(Price.date_created.desc())\
|
|
.all()
|
|
return prices[0] if prices else None
|
|
|
|
def get_historical_prices_for_product(
|
|
self, product: Product, price_type: Optional[PriceType] = None
|
|
) -> dict[PriceType, list[Price]]:
|
|
"""Get historical prices for a product, optionally filtered by type."""
|
|
query = self.db.query(Price).filter(Price.product_id == str(product.id))
|
|
|
|
if price_type:
|
|
query = query.filter(Price.type == price_type.value) # Fixed: Use enum value
|
|
|
|
prices = query.order_by(Price.date_created.desc()).all()
|
|
|
|
if price_type:
|
|
return {price_type: prices}
|
|
|
|
# Group prices by type
|
|
result = {t: [] for t in PriceType}
|
|
for price in prices:
|
|
result[PriceType(price.type)].append(price) # Fixed: Convert string to enum
|
|
return result
|
|
|
|
def _validate_price_data(self, prices: dict[str, Optional[Price]]) -> Optional[str]:
|
|
"""Validate price data and return error message if invalid."""
|
|
# Filter out None values and get valid prices
|
|
valid_prices = {k: v for k, v in prices.items() if v is not None}
|
|
|
|
if not valid_prices:
|
|
return "No valid price data available"
|
|
|
|
for price in valid_prices.values():
|
|
if price.price < 0:
|
|
return f"Negative price found: {price.price}"
|
|
if price.price > self.config.MAX_PRICE:
|
|
return f"Price exceeds maximum allowed: {price.price}"
|
|
|
|
return None
|
|
|
|
def _check_price_variance(self, prices: Dict[str, Decimal]) -> bool:
|
|
"""Check if the variance between prices is within acceptable limits."""
|
|
if not prices:
|
|
return True
|
|
|
|
min_price = min(prices.values())
|
|
max_price = max(prices.values())
|
|
|
|
if min_price == 0:
|
|
return False
|
|
|
|
variance_ratio = max_price / min_price
|
|
return variance_ratio <= (1 + self.config.MAX_PRICE_VARIANCE)
|
|
|
|
def _get_relevant_prices(self, product: Product) -> dict[str, Optional[Price]]:
|
|
"""Get all relevant prices for a product."""
|
|
return {
|
|
PriceType.TCG_LOW.value: self.get_latest_price_for_product(product, PriceType.TCG_LOW),
|
|
PriceType.TCG_DIRECT_LOW.value: self.get_latest_price_for_product(product, PriceType.TCG_DIRECT_LOW),
|
|
PriceType.TCG_MARKET.value: self.get_latest_price_for_product(product, PriceType.TCG_MARKET),
|
|
PriceType.TCG_LOW_WITH_SHIPPING.value: self.get_latest_price_for_product(product, PriceType.TCG_LOW_WITH_SHIPPING)
|
|
}
|
|
|
|
def _get_base_prices(
|
|
self, prices: dict[str, Price], include_shipping: bool = False
|
|
) -> Dict[str, Decimal]:
|
|
"""Get base prices, excluding None values."""
|
|
base_prices = {}
|
|
|
|
# Add core prices if they exist
|
|
if tcg_low := prices.get(PriceType.TCG_LOW.value):
|
|
base_prices[PriceType.TCG_LOW.value] = Decimal(str(tcg_low.price))
|
|
if tcg_direct := prices.get(PriceType.TCG_DIRECT_LOW.value):
|
|
base_prices[PriceType.TCG_DIRECT_LOW.value] = Decimal(str(tcg_direct.price))
|
|
if tcg_market := prices.get(PriceType.TCG_MARKET.value):
|
|
base_prices[PriceType.TCG_MARKET.value] = Decimal(str(tcg_market.price))
|
|
|
|
# Add shipping price if requested and available
|
|
if include_shipping:
|
|
if tcg_shipping := prices.get(PriceType.TCG_LOW_WITH_SHIPPING.value):
|
|
base_prices[PriceType.TCG_LOW_WITH_SHIPPING.value] = Decimal(str(tcg_shipping.price))
|
|
|
|
return base_prices
|
|
|
|
def _get_price_range(self, price: Decimal) -> Optional[PriceRange]:
|
|
"""Get the appropriate price range for a given price."""
|
|
for price_range in self.price_ranges:
|
|
if price_range.contains_price(price):
|
|
return price_range
|
|
return None
|
|
|
|
def _handle_floor_price_cases(
|
|
self, base_prices: Dict[str, Decimal]
|
|
) -> Optional[Decimal]:
|
|
"""Handle special cases for prices near or below floor price."""
|
|
if all(price < self.config.FLOOR_PRICE for price in base_prices.values()):
|
|
return self.config.FLOOR_PRICE
|
|
|
|
if any(price < self.config.FLOOR_PRICE for price in base_prices.values()):
|
|
max_price = max(base_prices.values())
|
|
return max_price * self.config.NEAR_FLOOR_MULT
|
|
|
|
return None
|
|
|
|
def calculate_price(
|
|
self, product_id: str, strategy: PricingStrategy = PricingStrategy.DEFAULT
|
|
) -> PriceCalculationResult:
|
|
"""Calculate the final price for a product using the specified pricing strategy."""
|
|
# get product
|
|
product = self.get_product_by_id(str(product_id)) # Fixed: Ensure string UUID
|
|
if not product:
|
|
logger.error(f"Product not found: {product_id}")
|
|
return PriceCalculationResult(product, None, {}, "Product not found")
|
|
|
|
# Get all relevant prices
|
|
prices = self._get_relevant_prices(product)
|
|
|
|
# Validate price data
|
|
if error := self._validate_price_data(prices):
|
|
logger.error(f"Invalid price data: {error}")
|
|
logger.error(f"product: {product.id}")
|
|
return PriceCalculationResult(product, None, {}, error)
|
|
|
|
# Get initial base prices without shipping
|
|
base_prices = self._get_base_prices(prices, include_shipping=False)
|
|
|
|
# Check price variance
|
|
if not self._check_price_variance(base_prices):
|
|
logger.error(f"Price variance exceeds acceptable threshold")
|
|
logger.error(f"Base prices: {base_prices}")
|
|
logger.error(f"product: {product.id}")
|
|
return PriceCalculationResult(
|
|
product, None, base_prices,
|
|
"Price variance exceeds acceptable threshold"
|
|
)
|
|
|
|
# Handle floor price cases
|
|
if floor_price := self._handle_floor_price_cases(base_prices):
|
|
return PriceCalculationResult(product, floor_price, base_prices)
|
|
|
|
# Get max base price and its range
|
|
max_base_price = max(base_prices.values())
|
|
price_range = self._get_price_range(max_base_price)
|
|
|
|
if not price_range:
|
|
logger.error(f"No valid price range found for price")
|
|
logger.error(f"Base prices: {base_prices}, max_base_price: {max_base_price}")
|
|
logger.error(f"product: {product.id}")
|
|
return PriceCalculationResult(
|
|
product, None, base_prices,
|
|
f"No valid price range found for price: {max_base_price}"
|
|
)
|
|
|
|
# Include shipping prices if necessary
|
|
if price_range.include_shipping:
|
|
base_prices = self._get_base_prices(prices, include_shipping=True)
|
|
max_base_price = max(base_prices.values())
|
|
|
|
# Recheck price range with shipping
|
|
price_range = self._get_price_range(max_base_price)
|
|
|
|
if not price_range:
|
|
logger.error(f"No valid price range found for price with shipping")
|
|
logger.error(f"Base prices: {base_prices}, max_base_price: {max_base_price}")
|
|
logger.error(f"product: {product.id}")
|
|
return PriceCalculationResult(
|
|
product, None, base_prices,
|
|
f"No valid price range found for price with shipping: {max_base_price}"
|
|
)
|
|
|
|
# Calculate final price using the price range
|
|
calculated_price = price_range.calculate_price(max_base_price)
|
|
|
|
# Apply strategy-specific adjustments
|
|
if strategy == PricingStrategy.AGGRESSIVE:
|
|
calculated_price *= Decimal('0.95')
|
|
elif strategy == PricingStrategy.CONSERVATIVE:
|
|
calculated_price *= Decimal('1.05')
|
|
|
|
debug_base_prices_with_name_string = ", ".join([f"{k}: {v}" for k, v in base_prices.items()])
|
|
|
|
logger.debug(f"Set price for to {calculated_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)} based on {debug_base_prices_with_name_string}")
|
|
|
|
return PriceCalculationResult(
|
|
product,
|
|
calculated_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP),
|
|
base_prices
|
|
)
|
|
|
|
def _bulk_generate_uuids(self, size: int) -> List[str]:
|
|
"""Generate UUIDs in bulk for better performance."""
|
|
return [str(uuid()) for _ in range(size)]
|
|
|
|
def _prepare_price_records(self, df: pd.DataFrame, price_type: str, uuids: List[str]) -> List[Dict]:
|
|
"""Prepare price records in bulk using vectorized operations."""
|
|
records = []
|
|
df['price_id'] = uuids[:len(df)]
|
|
df['type'] = price_type # price_type should already be a string value
|
|
df['date_created'] = datetime.utcnow()
|
|
|
|
return df[['price_id', 'product_id', 'type', 'price', 'date_created']].to_dict('records')
|
|
|
|
def _calculate_suggested_prices_batch(self, product_ids: List[str]) -> Dict[str, float]:
|
|
"""Calculate suggested prices in parallel for a batch of products."""
|
|
with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
|
|
future_to_id = {
|
|
executor.submit(self.calculate_price, str(pid)): pid # Fixed: Ensure string UUID
|
|
for pid in product_ids
|
|
}
|
|
|
|
results = {}
|
|
for future in as_completed(future_to_id):
|
|
product_id = future_to_id[future]
|
|
try:
|
|
result = future.result()
|
|
if result.success:
|
|
results[str(product_id)] = float(result.calculated_price) # Fixed: Ensure string UUID
|
|
except Exception as e:
|
|
logger.error(f"Failed to calculate price for product {product_id}: {e}")
|
|
|
|
return results
|
|
|
|
def _bulk_insert_prices(self, records: List[Dict]) -> None:
|
|
"""Efficiently insert price records in bulk."""
|
|
if not records:
|
|
return
|
|
|
|
try:
|
|
df = pd.DataFrame(records)
|
|
df.to_sql('prices', self.db.bind,
|
|
if_exists='append',
|
|
index=False,
|
|
method='multi',
|
|
chunksize=self.CHUNK_SIZE)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to bulk insert prices: {e}")
|
|
raise
|
|
|
|
def process_pricing_export(self, export_csv: bytes) -> None:
|
|
"""Process pricing export with optimized bulk operations."""
|
|
try:
|
|
# Convert CSV to DataFrame
|
|
df = self.df_util.csv_bytes_to_df(export_csv)
|
|
df.columns = df.columns.str.lower().str.replace(' ', '_')
|
|
|
|
# Get product mappings efficiently - SQLite compatible with chunking
|
|
SQLITE_MAX_VARS = 999 # SQLite parameter limit
|
|
tcgplayer_ids = df['tcgplayer_id'].tolist()
|
|
all_product_dfs = []
|
|
|
|
for i in range(0, len(tcgplayer_ids), SQLITE_MAX_VARS):
|
|
chunk_ids = tcgplayer_ids[i:i + SQLITE_MAX_VARS]
|
|
placeholders = ','.join([':id_' + str(j) for j in range(len(chunk_ids))])
|
|
product_query = f"""
|
|
SELECT tcgplayer_id, product_id
|
|
FROM card_tcgplayer
|
|
WHERE tcgplayer_id IN ({placeholders})
|
|
"""
|
|
|
|
# Create a dictionary of parameters
|
|
params = {f'id_{j}': id_val for j, id_val in enumerate(chunk_ids)}
|
|
|
|
chunk_df = pd.read_sql(
|
|
text(product_query),
|
|
self.db.bind,
|
|
params=params
|
|
)
|
|
all_product_dfs.append(chunk_df)
|
|
|
|
# Combine all chunks
|
|
product_df = pd.concat(all_product_dfs) if all_product_dfs else pd.DataFrame()
|
|
|
|
# Merge dataframes efficiently
|
|
merged_df = pd.merge(
|
|
df,
|
|
product_df,
|
|
on='tcgplayer_id',
|
|
how='inner'
|
|
)
|
|
|
|
# Define price columns mapping - using enum values directly
|
|
price_columns = {
|
|
'tcg_market_price': PriceType.TCG_MARKET.value,
|
|
'tcg_direct_low': PriceType.TCG_DIRECT_LOW.value,
|
|
'tcg_low_price_with_shipping': PriceType.TCG_LOW_WITH_SHIPPING.value,
|
|
'tcg_low_price': PriceType.TCG_LOW.value,
|
|
'tcg_marketplace_price': PriceType.TCG_MARKETPLACE.value
|
|
}
|
|
|
|
# Process each price type in chunks
|
|
for price_col, price_type in price_columns.items():
|
|
valid_prices_df = merged_df[merged_df[price_col].notna()].copy()
|
|
|
|
for chunk_start in range(0, len(valid_prices_df), self.CHUNK_SIZE):
|
|
chunk_df = valid_prices_df.iloc[chunk_start:chunk_start + self.CHUNK_SIZE].copy()
|
|
uuids = self._bulk_generate_uuids(len(chunk_df))
|
|
|
|
chunk_df['price'] = chunk_df[price_col]
|
|
chunk_df['product_id'] = chunk_df['product_id'].astype(str) # Fixed: Ensure string UUIDs
|
|
records = self._prepare_price_records(chunk_df, price_type, uuids)
|
|
self._bulk_insert_prices(records)
|
|
|
|
# Handle suggested prices separately with parallel processing
|
|
product_ids = merged_df['product_id'].unique()
|
|
suggested_prices = {}
|
|
|
|
for chunk_start in range(0, len(product_ids), self.CHUNK_SIZE):
|
|
chunk_ids = product_ids[chunk_start:chunk_start + self.CHUNK_SIZE]
|
|
chunk_prices = self._calculate_suggested_prices_batch(chunk_ids)
|
|
suggested_prices.update(chunk_prices)
|
|
|
|
# Create suggested price records
|
|
if suggested_prices:
|
|
suggested_df = pd.DataFrame([
|
|
{'product_id': str(pid), 'price': price} # Fixed: Ensure string UUIDs
|
|
for pid, price in suggested_prices.items()
|
|
])
|
|
|
|
uuids = self._bulk_generate_uuids(len(suggested_df))
|
|
records = self._prepare_price_records(suggested_df, 'suggested_price', uuids)
|
|
self._bulk_insert_prices(records)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process pricing export: {e}")
|
|
logger.error(f"Error occurred during price processing: {str(e)}")
|
|
raise |