409 lines
18 KiB
Python
409 lines
18 KiB
Python
from sqlalchemy.orm import Session
|
|
from app.db.models import File, CardTCGPlayer, Price, TCGPlayerInventory
|
|
from app.services.util._dataframe import TCGPlayerPricingRow, DataframeUtil
|
|
from app.services.file import FileService
|
|
from app.services.tcgplayer import TCGPlayerService
|
|
from uuid import uuid4
|
|
from app.db.utils import db_transaction
|
|
from typing import List, Dict
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
import pandas as pd
|
|
import numpy as np
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ACTIVE_PRICING_ALGORITHIM = 'tcgplayer_recommended_algo' # 'default_pricing_algo' or 'tcgplayer_recommended_algo'
|
|
FREE_SHIPPING = True
|
|
|
|
|
|
class PricingService:
|
|
def __init__(self, db: Session, file_service: FileService, tcgplayer_service: TCGPlayerService):
|
|
self.db = db
|
|
self.file_service = file_service
|
|
self.tcgplayer_service = tcgplayer_service
|
|
self.df_util = DataframeUtil()
|
|
|
|
# function for taking a tcgplayer pricing export with all set ids and loading it into the price table
|
|
# can be run as needed or scheduled
|
|
def get_pricing_export_content(self, file: File = None) -> bytes:
|
|
if ACTIVE_PRICING_ALGORITHIM == 'default_pricing_algo' and FREE_SHIPPING == True:
|
|
print("asdf")
|
|
if file:
|
|
file_content = self.file_service.get_file_content(file.id)
|
|
else:
|
|
file = self.tcgplayer_service.get_pricing_export_for_all_products()
|
|
file_content = self.file_service.get_file_content(file.id)
|
|
return file_content
|
|
|
|
def load_pricing_csv_content_to_db(self, file_content: bytes):
|
|
try:
|
|
if not file_content:
|
|
raise ValueError("No file content provided")
|
|
|
|
price_types = {
|
|
"tcg_market_price": "tcg_market_price",
|
|
"tcg_direct_low": "tcg_direct_low",
|
|
"tcg_low_price_with_shipping": "tcg_low_price_with_shipping",
|
|
"tcg_low_price": "tcg_low_price",
|
|
"tcg_marketplace_price": "listed_price"
|
|
}
|
|
|
|
required_columns = ["tcgplayer_id"] + list(price_types.keys())
|
|
df = self.df_util.csv_bytes_to_df(file_content)
|
|
|
|
# Validate columns
|
|
missing_columns = set(required_columns) - set(df.columns)
|
|
if missing_columns:
|
|
raise ValueError(f"Missing required columns: {missing_columns}")
|
|
|
|
# Process in true batches
|
|
for i in range(0, len(df), 1000):
|
|
batch = df.iloc[i:i+1000]
|
|
pricing_rows = [TCGPlayerPricingRow(row) for _, row in batch.iterrows()]
|
|
|
|
# Query cards for this batch only
|
|
tcgplayer_ids = [row.tcgplayer_id for row in pricing_rows]
|
|
batch_cards = self.db.query(CardTCGPlayer).filter(
|
|
CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids)
|
|
).all()
|
|
|
|
existing_cards = {card.tcgplayer_id: card for card in batch_cards}
|
|
|
|
new_prices = []
|
|
for row in pricing_rows:
|
|
if row.tcgplayer_id not in existing_cards:
|
|
continue
|
|
|
|
card = existing_cards[row.tcgplayer_id]
|
|
row_prices = [
|
|
Price(
|
|
id=str(uuid4()),
|
|
product_id=card.product_id,
|
|
marketplace_id=None,
|
|
type=price_type, # Added missing price_type
|
|
price=getattr(row, col_name)
|
|
)
|
|
for col_name, price_type in price_types.items()
|
|
if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0
|
|
]
|
|
new_prices.extend(row_prices)
|
|
|
|
# Save each batch separately
|
|
if new_prices:
|
|
with db_transaction(self.db):
|
|
self.db.bulk_save_objects(new_prices)
|
|
|
|
except Exception as e:
|
|
raise e # Consider adding logging here
|
|
|
|
|
|
def cron_load_prices(self, file: File = None):
|
|
file_content = self.get_pricing_export_content(file)
|
|
self.tcgplayer_service.load_tcgplayer_cards(file_content)
|
|
self.load_pricing_csv_content_to_db(file_content)
|
|
|
|
def get_all_prices_for_products(self, product_ids: List[str]) -> Dict[str, Dict[str, float]]:
|
|
all_prices = self.db.query(Price).filter(
|
|
Price.product_id.in_(product_ids)
|
|
).all()
|
|
|
|
price_lookup = {}
|
|
for price in all_prices:
|
|
if price.product_id not in price_lookup:
|
|
price_lookup[price.product_id] = {}
|
|
price_lookup[price.product_id][price.type] = price.price
|
|
return price_lookup
|
|
|
|
def apply_price_to_df_columns(self, row: pd.Series, price_lookup: Dict[str, Dict[str, float]]) -> pd.Series:
|
|
product_prices = price_lookup.get(row['product_id'], {})
|
|
for price_type, price in product_prices.items():
|
|
row[price_type] = price
|
|
return row
|
|
|
|
def smooth_markup(self, price, markup_bands):
|
|
"""
|
|
Applies a smoothed markup based on the given price and markup bands.
|
|
Uses numpy for smooth transitions.
|
|
"""
|
|
# Convert markup bands to lists for easy lookup
|
|
markups = np.array(list(markup_bands.keys()))
|
|
min_prices = np.array([x[0] for x in markup_bands.values()])
|
|
max_prices = np.array([x[1] for x in markup_bands.values()])
|
|
|
|
# Find the index of the price's range
|
|
idx = np.where((min_prices <= price) & (max_prices >= price))[0]
|
|
|
|
if len(idx) > 0:
|
|
# If price is within a defined range, return the markup
|
|
markup = markups[idx[0]]
|
|
else:
|
|
# If price is not directly within any range, check smooth transitions
|
|
# Find the closest two bands for interpolation
|
|
idx_lower = np.argmax(max_prices <= price) # Closest range below the price
|
|
idx_upper = np.argmax(min_prices > price) # Closest range above the price
|
|
|
|
if idx_lower != idx_upper:
|
|
# Linear interpolation between the two neighboring markups
|
|
price_diff = (price - max_prices[idx_lower]) / (min_prices[idx_upper] - max_prices[idx_lower])
|
|
markup = np.interp(price_diff, [0, 1], [markups[idx_lower], markups[idx_upper]])
|
|
|
|
# Apply the markup to the price
|
|
return price * markup
|
|
|
|
def tcgplayer_recommended_algo(self, row: pd.Series) -> pd.Series:
|
|
# Convert input values to Decimal for precise arithmetic
|
|
tcg_low = Decimal(str(row.get('tcg_low_price'))) if not pd.isna(row.get('tcg_low_price')) else None
|
|
tcg_low_shipping = Decimal(str(row.get('tcg_low_price_with_shipping'))) if not pd.isna(row.get('tcg_low_price_with_shipping')) else None
|
|
tcg_market_price = Decimal(str(row.get('tcg_market_price'))) if not pd.isna(row.get('tcg_market_price')) else None
|
|
current_price = Decimal(str(row.get('tcg_marketplace_price'))) if not pd.isna(row.get('tcg_marketplace_price')) else None
|
|
total_quantity = str(row.get('total_quantity')) if not pd.isna(row.get('total_quantity')) else "0"
|
|
added_quantity = str(row.get('add_to_quantity')) if not pd.isna(row.get('add_to_quantity')) else "0"
|
|
quantity = int(total_quantity) + int(added_quantity)
|
|
|
|
if tcg_market_price is None:
|
|
logger.warning(f"Missing pricing data for row: {row}")
|
|
row['new_price'] = None
|
|
return row
|
|
|
|
TWO_PLACES = Decimal('0.01')
|
|
|
|
# Original markup bands
|
|
markup_bands = {
|
|
Decimal('2.53'): (Decimal('0.01'), Decimal('0.50')),
|
|
Decimal('1.42'): (Decimal('0.51'), Decimal('1.00')),
|
|
Decimal('1.29'): (Decimal('1.01'), Decimal('3.00')),
|
|
Decimal('1.17'): (Decimal('3.01'), Decimal('20.00')),
|
|
Decimal('1.07'): (Decimal('20.01'), Decimal('35.00')),
|
|
Decimal('1.05'): (Decimal('35.01'), Decimal('50.00')),
|
|
Decimal('1.03'): (Decimal('50.01'), Decimal('100.00')),
|
|
Decimal('1.02'): (Decimal('100.01'), Decimal('200.00')),
|
|
Decimal('1.01'): (Decimal('200.01'), Decimal('1000.00'))
|
|
}
|
|
|
|
# Adjust markups if quantity is high
|
|
if quantity > 3:
|
|
adjusted_bands = {}
|
|
increment = Decimal('0.10')
|
|
for markup, price_range in zip(markup_bands.keys(), markup_bands.values()):
|
|
new_markup = Decimal(str(markup)) + increment
|
|
adjusted_bands[new_markup] = price_range
|
|
increment -= Decimal('0.01')
|
|
markup_bands = adjusted_bands
|
|
|
|
if FREE_SHIPPING:
|
|
if tcg_low_shipping:
|
|
tcg_compare_price = tcg_low_shipping
|
|
elif tcg_low:
|
|
tcg_compare_price = tcg_low + Decimal('1.31')
|
|
else:
|
|
logger.warning(f"No TCG low or shipping price available for row: {row}")
|
|
row['new_price'] = None
|
|
return row
|
|
else:
|
|
tcg_compare_price = tcg_low
|
|
if tcg_compare_price is None:
|
|
logger.warning(f"No TCG low price available for row: {row}")
|
|
row['new_price'] = None
|
|
return row
|
|
|
|
# Apply the smoothed markup
|
|
new_price = self.smooth_markup(tcg_compare_price, markup_bands)
|
|
|
|
# Enforce minimum price
|
|
if new_price < Decimal('0.25'):
|
|
new_price = Decimal('0.25')
|
|
|
|
# Avoid huge price drops
|
|
if current_price is not None and new_price / current_price > Decimal('0.25'):
|
|
logger.warning(f"Price drop too large for row: {row}")
|
|
new_price = current_price
|
|
|
|
# Round to 2 decimal places
|
|
new_price = new_price.quantize(TWO_PLACES, rounding=ROUND_HALF_UP)
|
|
|
|
# Convert back to float for dataframe
|
|
row['new_price'] = float(new_price)
|
|
|
|
logger.debug(f"""
|
|
card: {row['product_name']}
|
|
TCGplayer Id: {row['tcgplayer_id']}
|
|
Algorithm: {ACTIVE_PRICING_ALGORITHIM}
|
|
TCG Low: {tcg_low}
|
|
TCG Low Shipping: {tcg_low_shipping}
|
|
TCG Market Price: {tcg_market_price}
|
|
Current Price: {current_price}
|
|
Total Quantity: {total_quantity}
|
|
Added Quantity: {added_quantity}
|
|
Quantity: {quantity}
|
|
TCG Compare Price: {tcg_compare_price}
|
|
New Price: {new_price}
|
|
""")
|
|
|
|
return row
|
|
|
|
|
|
def default_pricing_algo(self, row: pd.Series) -> pd.Series:
|
|
"""Default pricing algorithm with complex pricing rules"""
|
|
|
|
# Convert input values to Decimal for precise arithmetic
|
|
tcg_low = Decimal(str(row.get('tcg_low_price'))) if not pd.isna(row.get('tcg_low_price')) else None
|
|
tcg_low_shipping = Decimal(str(row.get('tcg_low_price_with_shipping'))) if not pd.isna(row.get('tcg_low_price_with_shipping')) else None
|
|
tcg_market_price = Decimal(str(row.get('tcg_market_price'))) if not pd.isna(row.get('tcg_market_price')) else None
|
|
total_quantity = str(row.get('total_quantity')) if not pd.isna(row.get('total_quantity')) else "0"
|
|
added_quantity = str(row.get('add_to_quantity')) if not pd.isna(row.get('add_to_quantity')) else "0"
|
|
quantity = int(total_quantity) + int(added_quantity)
|
|
|
|
if tcg_market_price is None:
|
|
logger.warning(f"Missing pricing data for row: {row}")
|
|
row['new_price'] = None
|
|
return row
|
|
|
|
# Define precision for rounding
|
|
TWO_PLACES = Decimal('0.01')
|
|
|
|
# Apply pricing rules
|
|
if tcg_market_price < Decimal('1') and tcg_market_price > Decimal('0.25'):
|
|
new_price = tcg_market_price * Decimal('1.25')
|
|
elif tcg_market_price < Decimal('0.25'):
|
|
new_price = Decimal('0.25')
|
|
elif tcg_market_price < Decimal('5'):
|
|
new_price = tcg_market_price * Decimal('1.08')
|
|
elif tcg_market_price < Decimal('10'):
|
|
new_price = tcg_market_price * Decimal('1.06')
|
|
elif tcg_market_price < Decimal('20'):
|
|
new_price = tcg_market_price * Decimal('1.0125')
|
|
elif tcg_market_price < Decimal('50'):
|
|
new_price = tcg_market_price * Decimal('0.99')
|
|
elif tcg_market_price < Decimal('100'):
|
|
new_price = tcg_market_price * Decimal('0.98')
|
|
else:
|
|
new_price = tcg_market_price * Decimal('1.09')
|
|
|
|
if new_price < Decimal('0.25'):
|
|
new_price = Decimal('0.25')
|
|
|
|
if quantity > 3:
|
|
new_price = new_price * Decimal('1.1')
|
|
|
|
# Ensure exactly 2 decimal places
|
|
new_price = new_price.quantize(TWO_PLACES, rounding=ROUND_HALF_UP)
|
|
|
|
# Convert back to float or string as needed for your dataframe
|
|
row['new_price'] = float(new_price)
|
|
return row
|
|
|
|
def apply_pricing_algo(self, row: pd.Series, pricing_algo: callable = None) -> pd.Series:
|
|
"""Modified to handle the pricing algorithm as an instance method"""
|
|
if pricing_algo:
|
|
logger.debug(f"Using custom pricing algorithm: {pricing_algo.__name__}")
|
|
return pricing_algo(row)
|
|
elif ACTIVE_PRICING_ALGORITHIM == 'default_pricing_algo':
|
|
logger.debug(f"Using default pricing algorithm: {self.default_pricing_algo.__name__}")
|
|
pricing_algo = self.default_pricing_algo
|
|
elif ACTIVE_PRICING_ALGORITHIM == 'tcgplayer_recommended_algo':
|
|
logger.debug(f"Using TCGPlayer recommended algorithm: {self.tcgplayer_recommended_algo.__name__}")
|
|
pricing_algo = self.tcgplayer_recommended_algo
|
|
else:
|
|
logger.debug(f"Using default pricing algorithm: {self.default_pricing_algo.__name__}")
|
|
pricing_algo = self.default_pricing_algo
|
|
|
|
return pricing_algo(row)
|
|
|
|
def generate_tcgplayer_inventory_update_file_with_pricing(self, open_box_ids: List[str] = None) -> bytes:
|
|
desired_columns = [
|
|
'TCGplayer Id', 'Product Line', 'Set Name', 'Product Name',
|
|
'Title', 'Number', 'Rarity', 'Condition', 'TCG Market Price',
|
|
'TCG Direct Low', 'TCG Low Price With Shipping', 'TCG Low Price',
|
|
'Total Quantity', 'Add to Quantity', 'TCG Marketplace Price', 'Photo URL'
|
|
]
|
|
|
|
if open_box_ids:
|
|
# Get initial dataframe
|
|
update_type = 'add'
|
|
df = self.tcgplayer_service.open_box_cards_to_tcgplayer_inventory_df(open_box_ids)
|
|
else:
|
|
update_type = 'update'
|
|
df = self.tcgplayer_service.get_inventory_df('live')
|
|
# remove rows with total quantity of 0
|
|
df = df[df['total_quantity'] != 0]
|
|
tcgplayer_ids = df['tcgplayer_id'].unique().tolist()
|
|
|
|
# Make a single query to get all matching records
|
|
product_id_mapping = {
|
|
card.tcgplayer_id: card.product_id
|
|
for card in self.db.query(CardTCGPlayer)
|
|
.filter(CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids))
|
|
.all()
|
|
}
|
|
|
|
# Map tcgplayer_id to product_id, ensure strings, keep None if not found
|
|
df['product_id'] = df['tcgplayer_id'].map(product_id_mapping).apply(lambda x: str(x) if pd.notnull(x) else None)
|
|
|
|
# Log any tcgplayer_ids that didn't map to a product_id
|
|
null_product_ids = df[df['product_id'].isnull()]['tcgplayer_id'].tolist()
|
|
if null_product_ids:
|
|
logger.warning(f"The following tcgplayer_ids could not be mapped to a product_id: {null_product_ids}")
|
|
|
|
price_lookup = self.get_all_prices_for_products(df['product_id'].unique())
|
|
|
|
# Apply price columns
|
|
df = df.apply(lambda row: self.apply_price_to_df_columns(row, price_lookup), axis=1)
|
|
|
|
logger.debug(f"Applying pricing algorithm: {ACTIVE_PRICING_ALGORITHIM}")
|
|
|
|
# Apply pricing algorithm
|
|
df = df.apply(self.apply_pricing_algo, axis=1)
|
|
|
|
# if update type is update, remove rows where new_price == listed_price
|
|
if update_type == 'update':
|
|
df = df[df['new_price'] != df['listed_price']]
|
|
|
|
# Set marketplace price
|
|
df['TCG Marketplace Price'] = df['new_price']
|
|
|
|
df['Title'] = ''
|
|
|
|
column_mapping = {
|
|
'tcgplayer_id': 'TCGplayer Id',
|
|
'product_line': 'Product Line',
|
|
'set_name': 'Set Name',
|
|
'product_name': 'Product Name',
|
|
'title': 'Title',
|
|
'number': 'Number',
|
|
'rarity': 'Rarity',
|
|
'condition': 'Condition',
|
|
'tcg_market_price': 'TCG Market Price',
|
|
'tcg_direct_low': 'TCG Direct Low',
|
|
'tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
|
|
'tcg_low_price': 'TCG Low Price',
|
|
'total_quantity': 'Total Quantity',
|
|
'add_to_quantity': 'Add to Quantity',
|
|
'photo_url': 'Photo URL'
|
|
}
|
|
df = df.rename(columns=column_mapping)
|
|
|
|
# Now do your column selection
|
|
df = df[desired_columns]
|
|
|
|
if update_type == 'update':
|
|
with db_transaction(self.db):
|
|
self.db.query(TCGPlayerInventory).delete()
|
|
self.db.flush()
|
|
# copy df to modify before inserting
|
|
df_copy = df.copy()
|
|
df_copy['id'] = df_copy.apply(lambda x: str(uuid4()), axis=1)
|
|
# rename columns lowercase no space
|
|
df_copy.columns = df_copy.columns.str.lower().str.replace(' ', '_')
|
|
for index, row in df_copy.iterrows():
|
|
tcgplayer_inventory = TCGPlayerInventory(**row.to_dict())
|
|
self.db.add(tcgplayer_inventory)
|
|
|
|
# remove any rows with no price
|
|
#df = df[df['TCG Marketplace Price'] != 0]
|
|
#df = df[df['TCG Marketplace Price'].notna()]
|
|
|
|
# Convert to CSV bytes
|
|
csv_bytes = self.df_util.df_to_csv_bytes(df)
|
|
|
|
return csv_bytes |