giga_tcg/app/services/pricing.py
2025-02-07 20:29:39 -05:00

219 lines
8.8 KiB
Python

from sqlalchemy.orm import Session
from db.models import File, CardTCGPlayer, Price
from services.util._dataframe import TCGPlayerPricingRow, DataframeUtil
from services.file import FileService
from services.tcgplayer import TCGPlayerService
from uuid import uuid4
from db.utils import db_transaction
from typing import List, Dict
import pandas as pd
import logging
logger = logging.getLogger(__name__)
class PricingService:
def __init__(self, db: Session, file_service: FileService, tcgplayer_service: TCGPlayerService):
self.db = db
self.file_service = file_service
self.tcgplayer_service = tcgplayer_service
self.df_util = DataframeUtil()
# function for taking a tcgplayer pricing export with all set ids and loading it into the price table
# can be run as needed or scheduled
def get_pricing_export_content(self, file: File = None) -> bytes:
if file:
file_content = self.file_service.get_file_content(file.id)
else:
file = self.tcgplayer_service.get_pricing_export_for_all_products()
file_content = self.file_service.get_file_content(file.id)
return file_content
def load_pricing_csv_content_to_db(self, file_content: bytes):
try:
if not file_content:
raise ValueError("No file content provided")
price_types = {
"tcg_market_price": "tcg_market_price",
"tcg_direct_low": "tcg_direct_low",
"tcg_low_price_with_shipping": "tcg_low_price_with_shipping",
"tcg_low_price": "tcg_low_price",
"tcg_marketplace_price": "listed_price"
}
required_columns = ["tcgplayer_id"] + list(price_types.keys())
df = self.df_util.csv_bytes_to_df(file_content)
# Validate columns
missing_columns = set(required_columns) - set(df.columns)
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Process in true batches
for i in range(0, len(df), 1000):
batch = df.iloc[i:i+1000]
pricing_rows = [TCGPlayerPricingRow(row) for _, row in batch.iterrows()]
# Query cards for this batch only
tcgplayer_ids = [row.tcgplayer_id for row in pricing_rows]
batch_cards = self.db.query(CardTCGPlayer).filter(
CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids)
).all()
existing_cards = {card.tcgplayer_id: card for card in batch_cards}
new_prices = []
for row in pricing_rows:
if row.tcgplayer_id not in existing_cards:
continue
card = existing_cards[row.tcgplayer_id]
row_prices = [
Price(
id=str(uuid4()),
product_id=card.product_id,
marketplace_id=None,
type=price_type, # Added missing price_type
price=getattr(row, col_name)
)
for col_name, price_type in price_types.items()
if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0
]
new_prices.extend(row_prices)
# Save each batch separately
if new_prices:
with db_transaction(self.db):
self.db.bulk_save_objects(new_prices)
except Exception as e:
raise e # Consider adding logging here
def cron_load_prices(self, file: File = None):
file_content = self.get_pricing_export_content(file)
self.load_pricing_csv_content_to_db(file_content)
def get_all_prices_for_products(self, product_ids: List[str]) -> Dict[str, Dict[str, float]]:
all_prices = self.db.query(Price).filter(
Price.product_id.in_(product_ids)
).all()
price_lookup = {}
for price in all_prices:
if price.product_id not in price_lookup:
price_lookup[price.product_id] = {}
price_lookup[price.product_id][price.type] = price.price
return price_lookup
def apply_price_to_df_columns(self, row: pd.Series, price_lookup: Dict[str, Dict[str, float]]) -> pd.Series:
product_prices = price_lookup.get(row['product_id'], {})
for price_type, price in product_prices.items():
row[price_type] = price
return row
def default_pricing_algo(self, row: pd.Series) -> pd.Series:
"""Default pricing algorithm with complex pricing rules"""
tcg_low = row.get('tcg_low_price')
tcg_low_shipping = row.get('tcg_low_price_with_shipping')
if pd.isna(tcg_low) or pd.isna(tcg_low_shipping):
logger.warning(f"Missing pricing data for row: {row}")
row['new_price'] = None
return row
# Apply pricing rules
if tcg_low < 0.35:
new_price = 0.35
elif tcg_low < 5 or tcg_low_shipping < 5:
new_price = round(tcg_low * 1.25, 2)
elif tcg_low_shipping > 25:
new_price = round(tcg_low_shipping * 1.025, 2)
else:
new_price = round(tcg_low_shipping * 1.10, 2)
row['new_price'] = new_price
return row
def apply_pricing_algo(self, row: pd.Series, pricing_algo: callable = None) -> pd.Series:
"""Modified to handle the pricing algorithm as an instance method"""
if pricing_algo is None:
pricing_algo = self.default_pricing_algo
return pricing_algo(row)
def generate_tcgplayer_inventory_update_file_with_pricing(self, open_box_ids: List[str] = None) -> bytes:
desired_columns = [
'TCGplayer Id', 'Product Line', 'Set Name', 'Product Name',
'Title', 'Number', 'Rarity', 'Condition', 'TCG Market Price',
'TCG Direct Low', 'TCG Low Price With Shipping', 'TCG Low Price',
'Total Quantity', 'Add to Quantity', 'TCG Marketplace Price', 'Photo URL'
]
if open_box_ids:
# Get initial dataframe
update_type = 'add'
df = self.tcgplayer_service.open_box_cards_to_tcgplayer_inventory_df(open_box_ids)
else:
update_type = 'update'
df = self.tcgplayer_service.get_inventory_df('live')
# remove rows with total quantity of 0
df = df[df['total_quantity'] != 0]
tcgplayer_ids = df['tcgplayer_id'].unique().tolist()
# Make a single query to get all matching records
product_id_mapping = {
card.tcgplayer_id: card.product_id
for card in self.db.query(CardTCGPlayer)
.filter(CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids))
.all()
}
# Map the ids using the dictionary
df['product_id'] = df['tcgplayer_id'].map(product_id_mapping)
price_lookup = self.get_all_prices_for_products(df['product_id'].unique())
# Apply price columns
df = df.apply(lambda row: self.apply_price_to_df_columns(row, price_lookup), axis=1)
# Apply pricing algorithm
df = df.apply(self.apply_pricing_algo, axis=1)
# if update type is update, remove rows where new_price == listed_price
if update_type == 'update':
df = df[df['new_price'] != df['listed_price']]
# Set marketplace price
df['TCG Marketplace Price'] = df['new_price']
column_mapping = {
'tcgplayer_id': 'TCGplayer Id',
'product_line': 'Product Line',
'set_name': 'Set Name',
'product_name': 'Product Name',
'title': 'Title',
'number': 'Number',
'rarity': 'Rarity',
'condition': 'Condition',
'tcg_market_price': 'TCG Market Price',
'tcg_direct_low': 'TCG Direct Low',
'tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
'tcg_low_price': 'TCG Low Price',
'total_quantity': 'Total Quantity',
'add_to_quantity': 'Add to Quantity',
'photo_url': 'Photo URL'
}
df = df.rename(columns=column_mapping)
# Now do your column selection
df = df[desired_columns]
# remove any rows with no price
#df = df[df['TCG Marketplace Price'] != 0]
#df = df[df['TCG Marketplace Price'].notna()]
# Convert to CSV bytes
csv_bytes = self.df_util.df_to_csv_bytes(df)
return csv_bytes