giga_tcg/app/services/unholy_pricing.py
zman cc365970a9 Squashed commit of the following:
commit 893b229cc6b35c09181a84050f34fb79024e41c2
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 22:14:08 2025 -0500

    j

commit 06f539aea2f4fff9da7038d43d0de553c4423796
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:55:30 2025 -0500

    fk

commit d0c2960ec9f334448d2eb3573b9d7817482abf46
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:50:53 2025 -0500

    frick

commit 6b1362c166fc5f51c3bcf316a99116f0d11074a5
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:49:40 2025 -0500

    database

commit 8cadc6df4c817d9d05503807e56287fd00e5e939
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:38:09 2025 -0500

    asdf

commit 1ca6f9868452e34143b8df4a412be35e6902a31e
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:32:50 2025 -0500

    fffff

commit 8bb337a9c35e830ef9ce3dac0a0f2df3fe9bc5a0
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:31:13 2025 -0500

    ffff

commit 65aba280c55fa09c6a37f688f485efab1f70792b
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:26:16 2025 -0500

    aa

commit 59ef03a59ee4a15c30e080a1aef7c31c0214a2e3
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:24:21 2025 -0500

    asdf

commit f44d5740fc9315ccb0792ecac3e8ec9f28f171be
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:23:32 2025 -0500

    aaa

commit 13c96b164316b4908d9d01e454cbdc9103157558
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:18:54 2025 -0500

    sdf

commit 949c795fd13d93c9618613740fb093f6bb7b7710
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 21:17:53 2025 -0500

    asdf

commit 8c3cd423fe228e8aff112a050170246a5fc9f8bd
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 20:56:01 2025 -0500

    app2

commit 78eafc739ebb7f100f657964b3ad8f4937a4046b
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 20:54:55 2025 -0500

    app

commit dc47eced143e77ebec415bdfbe209d9466b7bcf1
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 20:43:15 2025 -0500

    asdfasdfasdf

commit e24bcae88cf8c14ea543f49b639b2976c627d201
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 20:39:44 2025 -0500

    a

commit c894451bfe790c97ac0e01085615d7c7288a39da
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 20:38:20 2025 -0500

    req

commit 3d09869562a96b5adc7c4be279bc8c003bbb37b2
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 20:33:27 2025 -0500

    wrong number = code dont work lol i love computers

commit 4c93a1271b8aea159cf53f8d7879b00513886d6f
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 20:29:39 2025 -0500

    q

commit 1f5361da88fe3903a1e92a345fa56bb390f69d92
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 18:27:20 2025 -0500

    same as original code now -5 days of my life

commit 511b070cbbcd29b4e784e9a09d58481e50e6e82f
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 13:52:28 2025 -0500

    pricey worky

commit 964fdd641b63530c59e038ebc7d1e01e9570d75c
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Fri Feb 7 11:37:29 2025 -0500

    prep for pricing service work

commit a78c3bcba303c2605b6277c1db33b155abe4db1b
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Wed Feb 5 21:51:22 2025 -0500

    more stuff yay

commit bd9cfca7a95c89b2140eec57bf52bc84432b9a4e
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Tue Feb 4 22:30:33 2025 -0500

    GIGA FIXED EVERYTHING OMG

commit 85510a46713e0ac660e70c7befb4e94ccf11912e
Author: zman <joshua.k.rzemien@gmail.com>
Date:   Tue Feb 4 00:01:34 2025 -0500

    data model change and some new services
2025-02-07 22:20:34 -05:00

500 lines
20 KiB
Python

from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from enum import Enum
from typing import Optional, Dict, List, Any
import pandas as pd
import logging
from db.models import Product, Price
from sqlalchemy.orm import Session
from uuid import uuid4 as uuid
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import text
from services.util._dataframe import DataframeUtil
logger = logging.getLogger(__name__)
class PriceType(str, Enum):
TCG_MARKET = 'tcg_market_price'
TCG_DIRECT_LOW = 'tcg_direct_low'
TCG_LOW_WITH_SHIPPING = 'tcg_low_price_with_shipping'
TCG_LOW = 'tcg_low_price'
TCG_MARKETPLACE = 'tcg_marketplace_price'
MY_PRICE = 'my_price'
class PricingStrategy(str, Enum):
DEFAULT = 'default'
AGGRESSIVE = 'aggressive'
CONSERVATIVE = 'conservative'
@dataclass
class PriceRange:
min_price: Decimal
max_price: Decimal
multiplier: Decimal
ceiling_price: Optional[Decimal] = None
include_shipping: bool = False
def __post_init__(self):
# Convert all values to Decimal for precise calculations
self.min_price = Decimal(str(self.min_price))
self.max_price = Decimal(str(self.max_price))
self.multiplier = Decimal(str(self.multiplier))
if self.ceiling_price is not None:
self.ceiling_price = Decimal(str(self.ceiling_price))
def contains_price(self, price: Decimal) -> bool:
"""Check if a price falls within this range, inclusive of min, exclusive of max."""
return self.min_price <= price < self.max_price
def calculate_price(self, base_price: Decimal) -> Decimal:
"""Calculate the final price for this range, respecting ceiling."""
calculated = base_price * self.multiplier
if self.ceiling_price is not None:
calculated = min(calculated, self.ceiling_price)
return calculated.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)
class PricingConfiguration:
"""Centralized configuration for pricing rules and thresholds."""
# Price thresholds
FLOOR_PRICE = Decimal('0.35')
MAX_PRICE = Decimal('100000.00') # Safety cap for maximum price
SHIPPING_THRESHOLD = Decimal('5.00')
# Multipliers
FLOOR_MULT = Decimal('1.25')
NEAR_FLOOR_MULT = Decimal('1.25')
UNDER_FIVE_MULT = Decimal('1.25')
FIVE_TO_TEN_MULT = Decimal('1.15')
TEN_TO_TWENTYFIVE_MULT = Decimal('1.10')
TWENTYFIVE_TO_FIFTY_MULT = Decimal('1.05')
FIFTY_PLUS_MULT = Decimal('1.025')
# Price variance thresholds
MAX_PRICE_VARIANCE = Decimal('0.50') # Maximum allowed variance between prices as a ratio
@classmethod
def get_price_ranges(cls) -> list[PriceRange]:
"""Get the list of price ranges with their respective rules."""
return [
PriceRange(
min_price=Decimal('0'),
max_price=cls.FLOOR_PRICE,
multiplier=cls.FLOOR_MULT,
include_shipping=False
),
PriceRange(
min_price=cls.FLOOR_PRICE,
max_price=Decimal('5'),
multiplier=cls.UNDER_FIVE_MULT,
ceiling_price=Decimal('4.99'),
include_shipping=False
),
PriceRange(
min_price=Decimal('5'),
max_price=Decimal('10'),
multiplier=cls.FIVE_TO_TEN_MULT,
ceiling_price=Decimal('9.99'),
include_shipping=True
),
PriceRange(
min_price=Decimal('10'),
max_price=Decimal('25'),
multiplier=cls.TEN_TO_TWENTYFIVE_MULT,
ceiling_price=Decimal('24.99'),
include_shipping=True
),
PriceRange(
min_price=Decimal('25'),
max_price=Decimal('50'),
multiplier=cls.TWENTYFIVE_TO_FIFTY_MULT,
ceiling_price=Decimal('49.99'),
include_shipping=True
),
PriceRange(
min_price=Decimal('50'),
max_price=cls.MAX_PRICE,
multiplier=cls.FIFTY_PLUS_MULT,
include_shipping=True
)
]
class PriceCalculationResult:
"""Represents the result of a price calculation."""
def __init__(
self,
product: Product,
calculated_price: Optional[Decimal],
base_prices: Dict[str, Decimal],
error: Optional[str] = None
):
self.product = product
self.calculated_price = calculated_price
self.base_prices = base_prices
self.error = error
@property
def success(self) -> bool:
return self.calculated_price is not None and self.error is None
@property
def max_base_price(self) -> Optional[Decimal]:
"""Returns the highest base price."""
return max(self.base_prices.values()) if self.base_prices else None
class PricingService:
CHUNK_SIZE = 5000 # Configurable batch size
MAX_WORKERS = 4 # Configurable worker count
def __init__(self, db: Session):
self.db = db
self.df_util = DataframeUtil()
self.config = PricingConfiguration
self.price_ranges = self.config.get_price_ranges()
def get_product_by_id(self, product_id: str) -> Optional[Product]:
"""Get a product by its ID."""
return self.db.query(Product)\
.filter(Product.id == str(product_id))\
.all()[0] if len(self.db.query(Product)\
.filter(Product.id == str(product_id))\
.all()) > 0 else None
def get_latest_price_for_product(self, product: Product, price_type: PriceType) -> Optional[Price]:
"""Get the most recent price of a specific type for a product."""
prices = self.db.query(Price)\
.filter(
Price.product_id == str(product.id),
Price.type == price_type.value
)\
.order_by(Price.date_created.desc())\
.all()
return prices[0] if prices else None
def get_historical_prices_for_product(
self, product: Product, price_type: Optional[PriceType] = None
) -> dict[PriceType, list[Price]]:
"""Get historical prices for a product, optionally filtered by type."""
query = self.db.query(Price).filter(Price.product_id == str(product.id))
if price_type:
query = query.filter(Price.type == price_type.value) # Fixed: Use enum value
prices = query.order_by(Price.date_created.desc()).all()
if price_type:
return {price_type: prices}
# Group prices by type
result = {t: [] for t in PriceType}
for price in prices:
result[PriceType(price.type)].append(price) # Fixed: Convert string to enum
return result
def _validate_price_data(self, prices: dict[str, Optional[Price]]) -> Optional[str]:
"""Validate price data and return error message if invalid."""
# Filter out None values and get valid prices
valid_prices = {k: v for k, v in prices.items() if v is not None}
if not valid_prices:
return "No valid price data available"
for price in valid_prices.values():
if price.price < 0:
return f"Negative price found: {price.price}"
if price.price > self.config.MAX_PRICE:
return f"Price exceeds maximum allowed: {price.price}"
return None
def _check_price_variance(self, prices: Dict[str, Decimal]) -> bool:
"""Check if the variance between prices is within acceptable limits."""
if not prices:
return True
min_price = min(prices.values())
max_price = max(prices.values())
if min_price == 0:
return False
variance_ratio = max_price / min_price
return variance_ratio <= (1 + self.config.MAX_PRICE_VARIANCE)
def _get_relevant_prices(self, product: Product) -> dict[str, Optional[Price]]:
"""Get all relevant prices for a product."""
return {
PriceType.TCG_LOW.value: self.get_latest_price_for_product(product, PriceType.TCG_LOW),
PriceType.TCG_DIRECT_LOW.value: self.get_latest_price_for_product(product, PriceType.TCG_DIRECT_LOW),
PriceType.TCG_MARKET.value: self.get_latest_price_for_product(product, PriceType.TCG_MARKET),
PriceType.TCG_LOW_WITH_SHIPPING.value: self.get_latest_price_for_product(product, PriceType.TCG_LOW_WITH_SHIPPING)
}
def _get_base_prices(
self, prices: dict[str, Price], include_shipping: bool = False
) -> Dict[str, Decimal]:
"""Get base prices, excluding None values."""
base_prices = {}
# Add core prices if they exist
if tcg_low := prices.get(PriceType.TCG_LOW.value):
base_prices[PriceType.TCG_LOW.value] = Decimal(str(tcg_low.price))
if tcg_direct := prices.get(PriceType.TCG_DIRECT_LOW.value):
base_prices[PriceType.TCG_DIRECT_LOW.value] = Decimal(str(tcg_direct.price))
if tcg_market := prices.get(PriceType.TCG_MARKET.value):
base_prices[PriceType.TCG_MARKET.value] = Decimal(str(tcg_market.price))
# Add shipping price if requested and available
if include_shipping:
if tcg_shipping := prices.get(PriceType.TCG_LOW_WITH_SHIPPING.value):
base_prices[PriceType.TCG_LOW_WITH_SHIPPING.value] = Decimal(str(tcg_shipping.price))
return base_prices
def _get_price_range(self, price: Decimal) -> Optional[PriceRange]:
"""Get the appropriate price range for a given price."""
for price_range in self.price_ranges:
if price_range.contains_price(price):
return price_range
return None
def _handle_floor_price_cases(
self, base_prices: Dict[str, Decimal]
) -> Optional[Decimal]:
"""Handle special cases for prices near or below floor price."""
if all(price < self.config.FLOOR_PRICE for price in base_prices.values()):
return self.config.FLOOR_PRICE
if any(price < self.config.FLOOR_PRICE for price in base_prices.values()):
max_price = max(base_prices.values())
return max_price * self.config.NEAR_FLOOR_MULT
return None
def calculate_price(
self, product_id: str, strategy: PricingStrategy = PricingStrategy.DEFAULT
) -> PriceCalculationResult:
"""Calculate the final price for a product using the specified pricing strategy."""
# get product
product = self.get_product_by_id(str(product_id)) # Fixed: Ensure string UUID
if not product:
logger.error(f"Product not found: {product_id}")
return PriceCalculationResult(product, None, {}, "Product not found")
# Get all relevant prices
prices = self._get_relevant_prices(product)
# Validate price data
if error := self._validate_price_data(prices):
logger.error(f"Invalid price data: {error}")
logger.error(f"product: {product.id}")
return PriceCalculationResult(product, None, {}, error)
# Get initial base prices without shipping
base_prices = self._get_base_prices(prices, include_shipping=False)
# Check price variance
if not self._check_price_variance(base_prices):
logger.error(f"Price variance exceeds acceptable threshold")
logger.error(f"Base prices: {base_prices}")
logger.error(f"product: {product.id}")
return PriceCalculationResult(
product, None, base_prices,
"Price variance exceeds acceptable threshold"
)
# Handle floor price cases
if floor_price := self._handle_floor_price_cases(base_prices):
return PriceCalculationResult(product, floor_price, base_prices)
# Get max base price and its range
max_base_price = max(base_prices.values())
price_range = self._get_price_range(max_base_price)
if not price_range:
logger.error(f"No valid price range found for price")
logger.error(f"Base prices: {base_prices}, max_base_price: {max_base_price}")
logger.error(f"product: {product.id}")
return PriceCalculationResult(
product, None, base_prices,
f"No valid price range found for price: {max_base_price}"
)
# Include shipping prices if necessary
if price_range.include_shipping:
base_prices = self._get_base_prices(prices, include_shipping=True)
max_base_price = max(base_prices.values())
# Recheck price range with shipping
price_range = self._get_price_range(max_base_price)
if not price_range:
logger.error(f"No valid price range found for price with shipping")
logger.error(f"Base prices: {base_prices}, max_base_price: {max_base_price}")
logger.error(f"product: {product.id}")
return PriceCalculationResult(
product, None, base_prices,
f"No valid price range found for price with shipping: {max_base_price}"
)
# Calculate final price using the price range
calculated_price = price_range.calculate_price(max_base_price)
# Apply strategy-specific adjustments
if strategy == PricingStrategy.AGGRESSIVE:
calculated_price *= Decimal('0.95')
elif strategy == PricingStrategy.CONSERVATIVE:
calculated_price *= Decimal('1.05')
debug_base_prices_with_name_string = ", ".join([f"{k}: {v}" for k, v in base_prices.items()])
logger.debug(f"Set price for to {calculated_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)} based on {debug_base_prices_with_name_string}")
return PriceCalculationResult(
product,
calculated_price.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP),
base_prices
)
def _bulk_generate_uuids(self, size: int) -> List[str]:
"""Generate UUIDs in bulk for better performance."""
return [str(uuid()) for _ in range(size)]
def _prepare_price_records(self, df: pd.DataFrame, price_type: str, uuids: List[str]) -> List[Dict]:
"""Prepare price records in bulk using vectorized operations."""
records = []
df['price_id'] = uuids[:len(df)]
df['type'] = price_type # price_type should already be a string value
df['date_created'] = datetime.utcnow()
return df[['price_id', 'product_id', 'type', 'price', 'date_created']].to_dict('records')
def _calculate_suggested_prices_batch(self, product_ids: List[str]) -> Dict[str, float]:
"""Calculate suggested prices in parallel for a batch of products."""
with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
future_to_id = {
executor.submit(self.calculate_price, str(pid)): pid # Fixed: Ensure string UUID
for pid in product_ids
}
results = {}
for future in as_completed(future_to_id):
product_id = future_to_id[future]
try:
result = future.result()
if result.success:
results[str(product_id)] = float(result.calculated_price) # Fixed: Ensure string UUID
except Exception as e:
logger.error(f"Failed to calculate price for product {product_id}: {e}")
return results
def _bulk_insert_prices(self, records: List[Dict]) -> None:
"""Efficiently insert price records in bulk."""
if not records:
return
try:
df = pd.DataFrame(records)
df.to_sql('prices', self.db.bind,
if_exists='append',
index=False,
method='multi',
chunksize=self.CHUNK_SIZE)
except Exception as e:
logger.error(f"Failed to bulk insert prices: {e}")
raise
def process_pricing_export(self, export_csv: bytes) -> None:
"""Process pricing export with optimized bulk operations."""
try:
# Convert CSV to DataFrame
df = self.df_util.csv_bytes_to_df(export_csv)
df.columns = df.columns.str.lower().str.replace(' ', '_')
# Get product mappings efficiently - SQLite compatible with chunking
SQLITE_MAX_VARS = 999 # SQLite parameter limit
tcgplayer_ids = df['tcgplayer_id'].tolist()
all_product_dfs = []
for i in range(0, len(tcgplayer_ids), SQLITE_MAX_VARS):
chunk_ids = tcgplayer_ids[i:i + SQLITE_MAX_VARS]
placeholders = ','.join([':id_' + str(j) for j in range(len(chunk_ids))])
product_query = f"""
SELECT tcgplayer_id, product_id
FROM card_tcgplayer
WHERE tcgplayer_id IN ({placeholders})
"""
# Create a dictionary of parameters
params = {f'id_{j}': id_val for j, id_val in enumerate(chunk_ids)}
chunk_df = pd.read_sql(
text(product_query),
self.db.bind,
params=params
)
all_product_dfs.append(chunk_df)
# Combine all chunks
product_df = pd.concat(all_product_dfs) if all_product_dfs else pd.DataFrame()
# Merge dataframes efficiently
merged_df = pd.merge(
df,
product_df,
on='tcgplayer_id',
how='inner'
)
# Define price columns mapping - using enum values directly
price_columns = {
'tcg_market_price': PriceType.TCG_MARKET.value,
'tcg_direct_low': PriceType.TCG_DIRECT_LOW.value,
'tcg_low_price_with_shipping': PriceType.TCG_LOW_WITH_SHIPPING.value,
'tcg_low_price': PriceType.TCG_LOW.value,
'tcg_marketplace_price': PriceType.TCG_MARKETPLACE.value
}
# Process each price type in chunks
for price_col, price_type in price_columns.items():
valid_prices_df = merged_df[merged_df[price_col].notna()].copy()
for chunk_start in range(0, len(valid_prices_df), self.CHUNK_SIZE):
chunk_df = valid_prices_df.iloc[chunk_start:chunk_start + self.CHUNK_SIZE].copy()
uuids = self._bulk_generate_uuids(len(chunk_df))
chunk_df['price'] = chunk_df[price_col]
chunk_df['product_id'] = chunk_df['product_id'].astype(str) # Fixed: Ensure string UUIDs
records = self._prepare_price_records(chunk_df, price_type, uuids)
self._bulk_insert_prices(records)
# Handle suggested prices separately with parallel processing
product_ids = merged_df['product_id'].unique()
suggested_prices = {}
for chunk_start in range(0, len(product_ids), self.CHUNK_SIZE):
chunk_ids = product_ids[chunk_start:chunk_start + self.CHUNK_SIZE]
chunk_prices = self._calculate_suggested_prices_batch(chunk_ids)
suggested_prices.update(chunk_prices)
# Create suggested price records
if suggested_prices:
suggested_df = pd.DataFrame([
{'product_id': str(pid), 'price': price} # Fixed: Ensure string UUIDs
for pid, price in suggested_prices.items()
])
uuids = self._bulk_generate_uuids(len(suggested_df))
records = self._prepare_price_records(suggested_df, 'suggested_price', uuids)
self._bulk_insert_prices(records)
except Exception as e:
logger.error(f"Failed to process pricing export: {e}")
logger.error(f"Error occurred during price processing: {str(e)}")
raise