giga_tcg/app/services/pricing.py
zman a97ba6e858
All checks were successful
Deploy App to Docker / deploy (push) Successful in 28s
asdf
2025-04-14 21:01:04 -04:00

413 lines
18 KiB
Python

from sqlalchemy.orm import Session
from app.db.models import File, CardTCGPlayer, Price, TCGPlayerInventory
from app.services.util._dataframe import TCGPlayerPricingRow, DataframeUtil
from app.services.file import FileService
from app.services.tcgplayer import TCGPlayerService
from uuid import uuid4
from app.db.utils import db_transaction
from typing import List, Dict
from decimal import Decimal, ROUND_HALF_UP
import pandas as pd
import numpy as np
import logging
logger = logging.getLogger(__name__)
ACTIVE_PRICING_ALGORITHIM = 'tcgplayer_recommended_algo' # 'default_pricing_algo' or 'tcgplayer_recommended_algo'
FREE_SHIPPING = True
class PricingService:
def __init__(self, db: Session, file_service: FileService, tcgplayer_service: TCGPlayerService):
self.db = db
self.file_service = file_service
self.tcgplayer_service = tcgplayer_service
self.df_util = DataframeUtil()
# function for taking a tcgplayer pricing export with all set ids and loading it into the price table
# can be run as needed or scheduled
def get_pricing_export_content(self, file: File = None) -> bytes:
if ACTIVE_PRICING_ALGORITHIM == 'default_pricing_algo' and FREE_SHIPPING == True:
print("asdf")
if file:
file_content = self.file_service.get_file_content(file.id)
else:
file = self.tcgplayer_service.get_pricing_export_for_all_products()
file_content = self.file_service.get_file_content(file.id)
return file_content
def load_pricing_csv_content_to_db(self, file_content: bytes):
try:
if not file_content:
raise ValueError("No file content provided")
price_types = {
"tcg_market_price": "tcg_market_price",
"tcg_direct_low": "tcg_direct_low",
"tcg_low_price_with_shipping": "tcg_low_price_with_shipping",
"tcg_low_price": "tcg_low_price",
"tcg_marketplace_price": "listed_price"
}
required_columns = ["tcgplayer_id"] + list(price_types.keys())
df = self.df_util.csv_bytes_to_df(file_content)
# Validate columns
missing_columns = set(required_columns) - set(df.columns)
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Process in true batches
for i in range(0, len(df), 1000):
batch = df.iloc[i:i+1000]
pricing_rows = [TCGPlayerPricingRow(row) for _, row in batch.iterrows()]
# Query cards for this batch only
tcgplayer_ids = [row.tcgplayer_id for row in pricing_rows]
batch_cards = self.db.query(CardTCGPlayer).filter(
CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids)
).all()
existing_cards = {card.tcgplayer_id: card for card in batch_cards}
new_prices = []
for row in pricing_rows:
if row.tcgplayer_id not in existing_cards:
continue
card = existing_cards[row.tcgplayer_id]
row_prices = [
Price(
id=str(uuid4()),
product_id=card.product_id,
marketplace_id=None,
type=price_type, # Added missing price_type
price=getattr(row, col_name)
)
for col_name, price_type in price_types.items()
if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0
]
new_prices.extend(row_prices)
# Save each batch separately
if new_prices:
with db_transaction(self.db):
self.db.bulk_save_objects(new_prices)
except Exception as e:
raise e # Consider adding logging here
def cron_load_prices(self, file: File = None):
file_content = self.get_pricing_export_content(file)
self.tcgplayer_service.load_tcgplayer_cards(file_content)
self.load_pricing_csv_content_to_db(file_content)
def get_all_prices_for_products(self, product_ids: List[str]) -> Dict[str, Dict[str, float]]:
all_prices = self.db.query(Price).filter(
Price.product_id.in_(product_ids)
).all()
price_lookup = {}
for price in all_prices:
if price.product_id not in price_lookup:
price_lookup[price.product_id] = {}
price_lookup[price.product_id][price.type] = price.price
return price_lookup
def apply_price_to_df_columns(self, row: pd.Series, price_lookup: Dict[str, Dict[str, float]]) -> pd.Series:
product_prices = price_lookup.get(row['product_id'], {})
for price_type, price in product_prices.items():
row[price_type] = price
return row
def smooth_markup(self, price, markup_bands):
"""
Applies a smoothed markup based on the given price and markup bands.
Uses numpy for smooth transitions.
"""
# Convert markup bands to lists for easy lookup
markups = np.array(list(markup_bands.keys()))
min_prices = np.array([x[0] for x in markup_bands.values()])
max_prices = np.array([x[1] for x in markup_bands.values()])
# Find the index of the price's range
idx = np.where((min_prices <= price) & (max_prices >= price))[0]
if len(idx) > 0:
# If price is within a defined range, return the markup
markup = markups[idx[0]]
else:
# If price is not directly within any range, check smooth transitions
# Find the closest two bands for interpolation
idx_lower = np.argmax(max_prices <= price) # Closest range below the price
idx_upper = np.argmax(min_prices > price) # Closest range above the price
if idx_lower != idx_upper:
# Linear interpolation between the two neighboring markups
price_diff = (price - max_prices[idx_lower]) / (min_prices[idx_upper] - max_prices[idx_lower])
markup = np.interp(price_diff, [0, 1], [markups[idx_lower], markups[idx_upper]])
# Apply the markup to the price
return price * markup
def tcgplayer_recommended_algo(self, row: pd.Series) -> pd.Series:
# Convert input values to Decimal for precise arithmetic
tcg_low = Decimal(str(row.get('tcg_low_price'))) if not pd.isna(row.get('tcg_low_price')) else None
tcg_low_shipping = Decimal(str(row.get('tcg_low_price_with_shipping'))) if not pd.isna(row.get('tcg_low_price_with_shipping')) else None
tcg_market_price = Decimal(str(row.get('tcg_market_price'))) if not pd.isna(row.get('tcg_market_price')) else None
current_price = Decimal(str(row.get('tcg_marketplace_price'))) if not pd.isna(row.get('tcg_marketplace_price')) else None
total_quantity = str(row.get('total_quantity')) if not pd.isna(row.get('total_quantity')) else "0"
added_quantity = str(row.get('add_to_quantity')) if not pd.isna(row.get('add_to_quantity')) else "0"
quantity = int(total_quantity) + int(added_quantity)
if tcg_market_price is None:
logger.warning(f"Missing pricing data for row: {row}")
row['new_price'] = None
return row
TWO_PLACES = Decimal('0.01')
# Original markup bands
markup_bands = {
Decimal('2.34'): (Decimal('0.01'), Decimal('0.50')),
Decimal('1.36'): (Decimal('0.51'), Decimal('1.00')),
Decimal('1.24'): (Decimal('1.01'), Decimal('3.00')),
Decimal('1.15'): (Decimal('3.01'), Decimal('20.00')),
Decimal('1.06'): (Decimal('20.01'), Decimal('35.00')),
Decimal('1.05'): (Decimal('35.01'), Decimal('50.00')),
Decimal('1.03'): (Decimal('50.01'), Decimal('100.00')),
Decimal('1.02'): (Decimal('100.01'), Decimal('200.00')),
Decimal('1.01'): (Decimal('200.01'), Decimal('1000.00'))
}
# Adjust markups if quantity is high
if quantity > 3:
adjusted_bands = {}
increment = Decimal('0.10')
for markup, price_range in zip(markup_bands.keys(), markup_bands.values()):
new_markup = Decimal(str(markup)) + increment
adjusted_bands[new_markup] = price_range
increment -= Decimal('0.01')
markup_bands = adjusted_bands
#if FREE_SHIPPING:
#if tcg_low_shipping and (tcg_low >= Decimal('5.00')):
#tcg_compare_price = tcg_low_shipping
#elif tcg_low_shipping and (tcg_low < Decimal('5.00')):
#tcg_compare_price = max(tcg_low_shipping - Decimal('1.31'), tcg_low)
#elif tcg_low:
#tcg_compare_price = tcg_low
#else:
#logger.warning(f"No TCG low or shipping price available for row: {row}")
#row['new_price'] = None
#return row
#else:
#tcg_compare_price = tcg_low
#if tcg_compare_price is None:
#logger.warning(f"No TCG low price available for row: {row}")
#row['new_price'] = None
#return row
tcg_compare_price = tcg_low
# Apply the smoothed markup
new_price = self.smooth_markup(tcg_compare_price, markup_bands)
# Enforce minimum price
if new_price < Decimal('0.35'):
new_price = Decimal('0.25')
# Avoid huge price drops
if current_price is not None and Decimal(str(((current_price - new_price) / current_price))) > Decimal('0.5'):
logger.warning(f"Price drop too large for row: {row}")
new_price = current_price
# Round to 2 decimal places
new_price = new_price.quantize(TWO_PLACES, rounding=ROUND_HALF_UP)
# Convert back to float for dataframe
row['new_price'] = float(new_price)
logger.debug(f"""
card: {row['product_name']}
TCGplayer Id: {row['tcgplayer_id']}
Algorithm: {ACTIVE_PRICING_ALGORITHIM}
TCG Low: {tcg_low}
TCG Low Shipping: {tcg_low_shipping}
TCG Market Price: {tcg_market_price}
Current Price: {current_price}
Total Quantity: {total_quantity}
Added Quantity: {added_quantity}
Quantity: {quantity}
TCG Compare Price: {tcg_compare_price}
New Price: {new_price}
""")
return row
def default_pricing_algo(self, row: pd.Series) -> pd.Series:
"""Default pricing algorithm with complex pricing rules"""
# Convert input values to Decimal for precise arithmetic
tcg_low = Decimal(str(row.get('tcg_low_price'))) if not pd.isna(row.get('tcg_low_price')) else None
tcg_low_shipping = Decimal(str(row.get('tcg_low_price_with_shipping'))) if not pd.isna(row.get('tcg_low_price_with_shipping')) else None
tcg_market_price = Decimal(str(row.get('tcg_market_price'))) if not pd.isna(row.get('tcg_market_price')) else None
total_quantity = str(row.get('total_quantity')) if not pd.isna(row.get('total_quantity')) else "0"
added_quantity = str(row.get('add_to_quantity')) if not pd.isna(row.get('add_to_quantity')) else "0"
quantity = int(total_quantity) + int(added_quantity)
if tcg_market_price is None:
logger.warning(f"Missing pricing data for row: {row}")
row['new_price'] = None
return row
# Define precision for rounding
TWO_PLACES = Decimal('0.01')
# Apply pricing rules
if tcg_market_price < Decimal('1') and tcg_market_price > Decimal('0.25'):
new_price = tcg_market_price * Decimal('1.25')
elif tcg_market_price < Decimal('0.25'):
new_price = Decimal('0.25')
elif tcg_market_price < Decimal('5'):
new_price = tcg_market_price * Decimal('1.08')
elif tcg_market_price < Decimal('10'):
new_price = tcg_market_price * Decimal('1.06')
elif tcg_market_price < Decimal('20'):
new_price = tcg_market_price * Decimal('1.0125')
elif tcg_market_price < Decimal('50'):
new_price = tcg_market_price * Decimal('0.99')
elif tcg_market_price < Decimal('100'):
new_price = tcg_market_price * Decimal('0.98')
else:
new_price = tcg_market_price * Decimal('1.09')
if new_price < Decimal('0.25'):
new_price = Decimal('0.25')
if quantity > 3:
new_price = new_price * Decimal('1.1')
# Ensure exactly 2 decimal places
new_price = new_price.quantize(TWO_PLACES, rounding=ROUND_HALF_UP)
# Convert back to float or string as needed for your dataframe
row['new_price'] = float(new_price)
return row
def apply_pricing_algo(self, row: pd.Series, pricing_algo: callable = None) -> pd.Series:
"""Modified to handle the pricing algorithm as an instance method"""
if pricing_algo:
logger.debug(f"Using custom pricing algorithm: {pricing_algo.__name__}")
return pricing_algo(row)
elif ACTIVE_PRICING_ALGORITHIM == 'default_pricing_algo':
logger.debug(f"Using default pricing algorithm: {self.default_pricing_algo.__name__}")
pricing_algo = self.default_pricing_algo
elif ACTIVE_PRICING_ALGORITHIM == 'tcgplayer_recommended_algo':
logger.debug(f"Using TCGPlayer recommended algorithm: {self.tcgplayer_recommended_algo.__name__}")
pricing_algo = self.tcgplayer_recommended_algo
else:
logger.debug(f"Using default pricing algorithm: {self.default_pricing_algo.__name__}")
pricing_algo = self.default_pricing_algo
return pricing_algo(row)
def generate_tcgplayer_inventory_update_file_with_pricing(self, open_box_ids: List[str] = None) -> bytes:
desired_columns = [
'TCGplayer Id', 'Product Line', 'Set Name', 'Product Name',
'Title', 'Number', 'Rarity', 'Condition', 'TCG Market Price',
'TCG Direct Low', 'TCG Low Price With Shipping', 'TCG Low Price',
'Total Quantity', 'Add to Quantity', 'TCG Marketplace Price', 'Photo URL'
]
if open_box_ids:
# Get initial dataframe
update_type = 'add'
df = self.tcgplayer_service.open_box_cards_to_tcgplayer_inventory_df(open_box_ids)
else:
update_type = 'update'
df = self.tcgplayer_service.get_inventory_df('live')
# remove rows with total quantity of 0
df = df[df['total_quantity'] != 0]
tcgplayer_ids = df['tcgplayer_id'].unique().tolist()
# Make a single query to get all matching records
product_id_mapping = {
card.tcgplayer_id: card.product_id
for card in self.db.query(CardTCGPlayer)
.filter(CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids))
.all()
}
# Map tcgplayer_id to product_id, ensure strings, keep None if not found
df['product_id'] = df['tcgplayer_id'].map(product_id_mapping).apply(lambda x: str(x) if pd.notnull(x) else None)
# Log any tcgplayer_ids that didn't map to a product_id
null_product_ids = df[df['product_id'].isnull()]['tcgplayer_id'].tolist()
if null_product_ids:
logger.warning(f"The following tcgplayer_ids could not be mapped to a product_id: {null_product_ids}")
price_lookup = self.get_all_prices_for_products(df['product_id'].unique())
# Apply price columns
df = df.apply(lambda row: self.apply_price_to_df_columns(row, price_lookup), axis=1)
logger.debug(f"Applying pricing algorithm: {ACTIVE_PRICING_ALGORITHIM}")
# Apply pricing algorithm
df = df.apply(self.apply_pricing_algo, axis=1)
# if update type is update, remove rows where new_price == listed_price
if update_type == 'update':
df = df[df['new_price'] != df['listed_price']]
# Set marketplace price
df['TCG Marketplace Price'] = df['new_price']
df['Title'] = ''
column_mapping = {
'tcgplayer_id': 'TCGplayer Id',
'product_line': 'Product Line',
'set_name': 'Set Name',
'product_name': 'Product Name',
'title': 'Title',
'number': 'Number',
'rarity': 'Rarity',
'condition': 'Condition',
'tcg_market_price': 'TCG Market Price',
'tcg_direct_low': 'TCG Direct Low',
'tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
'tcg_low_price': 'TCG Low Price',
'total_quantity': 'Total Quantity',
'add_to_quantity': 'Add to Quantity',
'photo_url': 'Photo URL'
}
df = df.rename(columns=column_mapping)
# Now do your column selection
df = df[desired_columns]
if update_type == 'update':
with db_transaction(self.db):
self.db.query(TCGPlayerInventory).delete()
self.db.flush()
# copy df to modify before inserting
df_copy = df.copy()
df_copy['id'] = df_copy.apply(lambda x: str(uuid4()), axis=1)
# rename columns lowercase no space
df_copy.columns = df_copy.columns.str.lower().str.replace(' ', '_')
for index, row in df_copy.iterrows():
tcgplayer_inventory = TCGPlayerInventory(**row.to_dict())
self.db.add(tcgplayer_inventory)
# remove any rows with no price
#df = df[df['TCG Marketplace Price'] != 0]
#df = df[df['TCG Marketplace Price'].notna()]
# Convert to CSV bytes
csv_bytes = self.df_util.df_to_csv_bytes(df)
return csv_bytes