from sqlalchemy.orm import Session from app.db.models import File, CardTCGPlayer, Price, TCGPlayerInventory from app.services.util._dataframe import TCGPlayerPricingRow, DataframeUtil from app.services.file import FileService from app.services.tcgplayer import TCGPlayerService from uuid import uuid4 from app.db.utils import db_transaction from typing import List, Dict from decimal import Decimal, ROUND_HALF_UP import pandas as pd import logging logger = logging.getLogger(__name__) class PricingService: def __init__(self, db: Session, file_service: FileService, tcgplayer_service: TCGPlayerService): self.db = db self.file_service = file_service self.tcgplayer_service = tcgplayer_service self.df_util = DataframeUtil() # function for taking a tcgplayer pricing export with all set ids and loading it into the price table # can be run as needed or scheduled def get_pricing_export_content(self, file: File = None) -> bytes: if file: file_content = self.file_service.get_file_content(file.id) else: file = self.tcgplayer_service.get_pricing_export_for_all_products() file_content = self.file_service.get_file_content(file.id) return file_content def load_pricing_csv_content_to_db(self, file_content: bytes): try: if not file_content: raise ValueError("No file content provided") price_types = { "tcg_market_price": "tcg_market_price", "tcg_direct_low": "tcg_direct_low", "tcg_low_price_with_shipping": "tcg_low_price_with_shipping", "tcg_low_price": "tcg_low_price", "tcg_marketplace_price": "listed_price" } required_columns = ["tcgplayer_id"] + list(price_types.keys()) df = self.df_util.csv_bytes_to_df(file_content) # Validate columns missing_columns = set(required_columns) - set(df.columns) if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}") # Process in true batches for i in range(0, len(df), 1000): batch = df.iloc[i:i+1000] pricing_rows = [TCGPlayerPricingRow(row) for _, row in batch.iterrows()] # Query cards for this batch only tcgplayer_ids = [row.tcgplayer_id for row in pricing_rows] batch_cards = self.db.query(CardTCGPlayer).filter( CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids) ).all() existing_cards = {card.tcgplayer_id: card for card in batch_cards} new_prices = [] for row in pricing_rows: if row.tcgplayer_id not in existing_cards: continue card = existing_cards[row.tcgplayer_id] row_prices = [ Price( id=str(uuid4()), product_id=card.product_id, marketplace_id=None, type=price_type, # Added missing price_type price=getattr(row, col_name) ) for col_name, price_type in price_types.items() if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0 ] new_prices.extend(row_prices) # Save each batch separately if new_prices: with db_transaction(self.db): self.db.bulk_save_objects(new_prices) except Exception as e: raise e # Consider adding logging here def cron_load_prices(self, file: File = None): file_content = self.get_pricing_export_content(file) self.tcgplayer_service.load_tcgplayer_cards(file_content) self.load_pricing_csv_content_to_db(file_content) def get_all_prices_for_products(self, product_ids: List[str]) -> Dict[str, Dict[str, float]]: all_prices = self.db.query(Price).filter( Price.product_id.in_(product_ids) ).all() price_lookup = {} for price in all_prices: if price.product_id not in price_lookup: price_lookup[price.product_id] = {} price_lookup[price.product_id][price.type] = price.price return price_lookup def apply_price_to_df_columns(self, row: pd.Series, price_lookup: Dict[str, Dict[str, float]]) -> pd.Series: product_prices = price_lookup.get(row['product_id'], {}) for price_type, price in product_prices.items(): row[price_type] = price return row def default_pricing_algo(self, row: pd.Series) -> pd.Series: """Default pricing algorithm with complex pricing rules""" # Convert input values to Decimal for precise arithmetic tcg_low = Decimal(str(row.get('tcg_low_price'))) if not pd.isna(row.get('tcg_low_price')) else None tcg_low_shipping = Decimal(str(row.get('tcg_low_price_with_shipping'))) if not pd.isna(row.get('tcg_low_price_with_shipping')) else None tcg_market_price = Decimal(str(row.get('tcg_market_price'))) if not pd.isna(row.get('tcg_market_price')) else None total_quantity = str(row.get('total_quantity')) if not pd.isna(row.get('total_quantity')) else "0" added_quantity = str(row.get('add_to_quantity')) if not pd.isna(row.get('add_to_quantity')) else "0" quantity = int(total_quantity) + int(added_quantity) if tcg_market_price is None: logger.warning(f"Missing pricing data for row: {row}") row['new_price'] = None return row # Define precision for rounding TWO_PLACES = Decimal('0.01') # Apply pricing rules if tcg_market_price < Decimal('1') and tcg_market_price > Decimal('0.25'): new_price = tcg_market_price * Decimal('1.25') elif tcg_market_price < Decimal('0.25'): new_price = Decimal('0.25') elif tcg_market_price < Decimal('5'): new_price = tcg_market_price * Decimal('1.08') elif tcg_market_price < Decimal('10'): new_price = tcg_market_price * Decimal('1.06') elif tcg_market_price < Decimal('20'): new_price = tcg_market_price * Decimal('1.0125') elif tcg_market_price < Decimal('50'): new_price = tcg_market_price * Decimal('0.99') elif tcg_market_price < Decimal('100'): new_price = tcg_market_price * Decimal('0.98') else: new_price = tcg_market_price * Decimal('1.09') if new_price < Decimal('0.25'): new_price = Decimal('0.25') if quantity > 3: new_price = new_price * Decimal('1.1') # Ensure exactly 2 decimal places new_price = new_price.quantize(TWO_PLACES, rounding=ROUND_HALF_UP) # Convert back to float or string as needed for your dataframe row['new_price'] = float(new_price) return row def apply_pricing_algo(self, row: pd.Series, pricing_algo: callable = None) -> pd.Series: """Modified to handle the pricing algorithm as an instance method""" if pricing_algo is None: pricing_algo = self.default_pricing_algo return pricing_algo(row) def generate_tcgplayer_inventory_update_file_with_pricing(self, open_box_ids: List[str] = None) -> bytes: desired_columns = [ 'TCGplayer Id', 'Product Line', 'Set Name', 'Product Name', 'Title', 'Number', 'Rarity', 'Condition', 'TCG Market Price', 'TCG Direct Low', 'TCG Low Price With Shipping', 'TCG Low Price', 'Total Quantity', 'Add to Quantity', 'TCG Marketplace Price', 'Photo URL' ] if open_box_ids: # Get initial dataframe update_type = 'add' df = self.tcgplayer_service.open_box_cards_to_tcgplayer_inventory_df(open_box_ids) else: update_type = 'update' df = self.tcgplayer_service.get_inventory_df('live') # remove rows with total quantity of 0 df = df[df['total_quantity'] != 0] tcgplayer_ids = df['tcgplayer_id'].unique().tolist() # Make a single query to get all matching records product_id_mapping = { card.tcgplayer_id: card.product_id for card in self.db.query(CardTCGPlayer) .filter(CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids)) .all() } # Map the ids using the dictionary df['product_id'] = df['tcgplayer_id'].map(product_id_mapping) price_lookup = self.get_all_prices_for_products(df['product_id'].unique()) # Apply price columns df = df.apply(lambda row: self.apply_price_to_df_columns(row, price_lookup), axis=1) # Apply pricing algorithm df = df.apply(self.apply_pricing_algo, axis=1) # if update type is update, remove rows where new_price == listed_price if update_type == 'update': df = df[df['new_price'] != df['listed_price']] # Set marketplace price df['TCG Marketplace Price'] = df['new_price'] df['Title'] = '' column_mapping = { 'tcgplayer_id': 'TCGplayer Id', 'product_line': 'Product Line', 'set_name': 'Set Name', 'product_name': 'Product Name', 'title': 'Title', 'number': 'Number', 'rarity': 'Rarity', 'condition': 'Condition', 'tcg_market_price': 'TCG Market Price', 'tcg_direct_low': 'TCG Direct Low', 'tcg_low_price_with_shipping': 'TCG Low Price With Shipping', 'tcg_low_price': 'TCG Low Price', 'total_quantity': 'Total Quantity', 'add_to_quantity': 'Add to Quantity', 'photo_url': 'Photo URL' } df = df.rename(columns=column_mapping) # Now do your column selection df = df[desired_columns] if update_type == 'update': with db_transaction(self.db): self.db.query(TCGPlayerInventory).delete() self.db.flush() # copy df to modify before inserting df_copy = df.copy() df_copy['id'] = df_copy.apply(lambda x: str(uuid4()), axis=1) # rename columns lowercase no space df_copy.columns = df_copy.columns.str.lower().str.replace(' ', '_') for index, row in df_copy.iterrows(): tcgplayer_inventory = TCGPlayerInventory(**row.to_dict()) self.db.add(tcgplayer_inventory) # remove any rows with no price #df = df[df['TCG Marketplace Price'] != 0] #df = df[df['TCG Marketplace Price'].notna()] # Convert to CSV bytes csv_bytes = self.df_util.df_to_csv_bytes(df) return csv_bytes