227 lines
9.3 KiB
Python
227 lines
9.3 KiB
Python
from sqlalchemy.orm import Session
|
|
from app.db.models import File, CardTCGPlayer, Price
|
|
from app.services.util._dataframe import TCGPlayerPricingRow, DataframeUtil
|
|
from app.services.file import FileService
|
|
from app.services.tcgplayer import TCGPlayerService
|
|
from uuid import uuid4
|
|
from app.db.utils import db_transaction
|
|
from typing import List, Dict
|
|
import pandas as pd
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PricingService:
|
|
def __init__(self, db: Session, file_service: FileService, tcgplayer_service: TCGPlayerService):
|
|
self.db = db
|
|
self.file_service = file_service
|
|
self.tcgplayer_service = tcgplayer_service
|
|
self.df_util = DataframeUtil()
|
|
|
|
# function for taking a tcgplayer pricing export with all set ids and loading it into the price table
|
|
# can be run as needed or scheduled
|
|
def get_pricing_export_content(self, file: File = None) -> bytes:
|
|
if file:
|
|
file_content = self.file_service.get_file_content(file.id)
|
|
else:
|
|
file = self.tcgplayer_service.get_pricing_export_for_all_products()
|
|
file_content = self.file_service.get_file_content(file.id)
|
|
return file_content
|
|
|
|
def load_pricing_csv_content_to_db(self, file_content: bytes):
|
|
try:
|
|
if not file_content:
|
|
raise ValueError("No file content provided")
|
|
|
|
price_types = {
|
|
"tcg_market_price": "tcg_market_price",
|
|
"tcg_direct_low": "tcg_direct_low",
|
|
"tcg_low_price_with_shipping": "tcg_low_price_with_shipping",
|
|
"tcg_low_price": "tcg_low_price",
|
|
"tcg_marketplace_price": "listed_price"
|
|
}
|
|
|
|
required_columns = ["tcgplayer_id"] + list(price_types.keys())
|
|
df = self.df_util.csv_bytes_to_df(file_content)
|
|
|
|
# Validate columns
|
|
missing_columns = set(required_columns) - set(df.columns)
|
|
if missing_columns:
|
|
raise ValueError(f"Missing required columns: {missing_columns}")
|
|
|
|
# Process in true batches
|
|
for i in range(0, len(df), 1000):
|
|
batch = df.iloc[i:i+1000]
|
|
pricing_rows = [TCGPlayerPricingRow(row) for _, row in batch.iterrows()]
|
|
|
|
# Query cards for this batch only
|
|
tcgplayer_ids = [row.tcgplayer_id for row in pricing_rows]
|
|
batch_cards = self.db.query(CardTCGPlayer).filter(
|
|
CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids)
|
|
).all()
|
|
|
|
existing_cards = {card.tcgplayer_id: card for card in batch_cards}
|
|
|
|
new_prices = []
|
|
for row in pricing_rows:
|
|
if row.tcgplayer_id not in existing_cards:
|
|
continue
|
|
|
|
card = existing_cards[row.tcgplayer_id]
|
|
row_prices = [
|
|
Price(
|
|
id=str(uuid4()),
|
|
product_id=card.product_id,
|
|
marketplace_id=None,
|
|
type=price_type, # Added missing price_type
|
|
price=getattr(row, col_name)
|
|
)
|
|
for col_name, price_type in price_types.items()
|
|
if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0
|
|
]
|
|
new_prices.extend(row_prices)
|
|
|
|
# Save each batch separately
|
|
if new_prices:
|
|
with db_transaction(self.db):
|
|
self.db.bulk_save_objects(new_prices)
|
|
|
|
except Exception as e:
|
|
raise e # Consider adding logging here
|
|
|
|
|
|
def cron_load_prices(self, file: File = None):
|
|
file_content = self.get_pricing_export_content(file)
|
|
self.load_pricing_csv_content_to_db(file_content)
|
|
|
|
def get_all_prices_for_products(self, product_ids: List[str]) -> Dict[str, Dict[str, float]]:
|
|
all_prices = self.db.query(Price).filter(
|
|
Price.product_id.in_(product_ids)
|
|
).all()
|
|
|
|
price_lookup = {}
|
|
for price in all_prices:
|
|
if price.product_id not in price_lookup:
|
|
price_lookup[price.product_id] = {}
|
|
price_lookup[price.product_id][price.type] = price.price
|
|
return price_lookup
|
|
|
|
def apply_price_to_df_columns(self, row: pd.Series, price_lookup: Dict[str, Dict[str, float]]) -> pd.Series:
|
|
product_prices = price_lookup.get(row['product_id'], {})
|
|
for price_type, price in product_prices.items():
|
|
row[price_type] = price
|
|
return row
|
|
|
|
def default_pricing_algo(self, row: pd.Series) -> pd.Series:
|
|
"""Default pricing algorithm with complex pricing rules"""
|
|
tcg_low = row.get('tcg_low_price')
|
|
tcg_low_shipping = row.get('tcg_low_price_with_shipping')
|
|
tcg_market_price = row.get('tcg_market_price')
|
|
|
|
if pd.isna(tcg_low) or pd.isna(tcg_low_shipping):
|
|
logger.warning(f"Missing pricing data for row: {row}")
|
|
row['new_price'] = None
|
|
return row
|
|
|
|
# Apply pricing rules
|
|
if tcg_market_price < 1 and tcg_market_price > 0.25:
|
|
new_price = tcg_market_price
|
|
elif tcg_market_price < 0.25:
|
|
new_price = 0.25
|
|
elif tcg_low < 5 or tcg_low_shipping < 5:
|
|
new_price = round(tcg_low+((abs(tcg_market_price-tcg_low))/2), 2)
|
|
elif tcg_low_shipping > 20:
|
|
new_price = round(tcg_low_shipping * 1.0125, 2)
|
|
else:
|
|
new_price = round(tcg_low_shipping * 1.08, 2)
|
|
# if new price is less than half of market price, set to 90% market
|
|
if new_price < (tcg_market_price / 2):
|
|
new_price = round(tcg_market_price * 0.85, 2)
|
|
if new_price < 0.25:
|
|
new_price = 0.25
|
|
|
|
row['new_price'] = new_price
|
|
return row
|
|
|
|
def apply_pricing_algo(self, row: pd.Series, pricing_algo: callable = None) -> pd.Series:
|
|
"""Modified to handle the pricing algorithm as an instance method"""
|
|
if pricing_algo is None:
|
|
pricing_algo = self.default_pricing_algo
|
|
return pricing_algo(row)
|
|
|
|
def generate_tcgplayer_inventory_update_file_with_pricing(self, open_box_ids: List[str] = None) -> bytes:
|
|
desired_columns = [
|
|
'TCGplayer Id', 'Product Line', 'Set Name', 'Product Name',
|
|
'Title', 'Number', 'Rarity', 'Condition', 'TCG Market Price',
|
|
'TCG Direct Low', 'TCG Low Price With Shipping', 'TCG Low Price',
|
|
'Total Quantity', 'Add to Quantity', 'TCG Marketplace Price', 'Photo URL'
|
|
]
|
|
|
|
if open_box_ids:
|
|
# Get initial dataframe
|
|
update_type = 'add'
|
|
df = self.tcgplayer_service.open_box_cards_to_tcgplayer_inventory_df(open_box_ids)
|
|
else:
|
|
update_type = 'update'
|
|
df = self.tcgplayer_service.get_inventory_df('live')
|
|
# remove rows with total quantity of 0
|
|
df = df[df['total_quantity'] != 0]
|
|
tcgplayer_ids = df['tcgplayer_id'].unique().tolist()
|
|
|
|
# Make a single query to get all matching records
|
|
product_id_mapping = {
|
|
card.tcgplayer_id: card.product_id
|
|
for card in self.db.query(CardTCGPlayer)
|
|
.filter(CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids))
|
|
.all()
|
|
}
|
|
|
|
# Map the ids using the dictionary
|
|
df['product_id'] = df['tcgplayer_id'].map(product_id_mapping)
|
|
|
|
price_lookup = self.get_all_prices_for_products(df['product_id'].unique())
|
|
|
|
# Apply price columns
|
|
df = df.apply(lambda row: self.apply_price_to_df_columns(row, price_lookup), axis=1)
|
|
|
|
# Apply pricing algorithm
|
|
df = df.apply(self.apply_pricing_algo, axis=1)
|
|
|
|
# if update type is update, remove rows where new_price == listed_price
|
|
if update_type == 'update':
|
|
df = df[df['new_price'] != df['listed_price']]
|
|
|
|
# Set marketplace price
|
|
df['TCG Marketplace Price'] = df['new_price']
|
|
|
|
column_mapping = {
|
|
'tcgplayer_id': 'TCGplayer Id',
|
|
'product_line': 'Product Line',
|
|
'set_name': 'Set Name',
|
|
'product_name': 'Product Name',
|
|
'title': 'Title',
|
|
'number': 'Number',
|
|
'rarity': 'Rarity',
|
|
'condition': 'Condition',
|
|
'tcg_market_price': 'TCG Market Price',
|
|
'tcg_direct_low': 'TCG Direct Low',
|
|
'tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
|
|
'tcg_low_price': 'TCG Low Price',
|
|
'total_quantity': 'Total Quantity',
|
|
'add_to_quantity': 'Add to Quantity',
|
|
'photo_url': 'Photo URL'
|
|
}
|
|
df = df.rename(columns=column_mapping)
|
|
|
|
# Now do your column selection
|
|
df = df[desired_columns]
|
|
|
|
# remove any rows with no price
|
|
#df = df[df['TCG Marketplace Price'] != 0]
|
|
#df = df[df['TCG Marketplace Price'].notna()]
|
|
|
|
# Convert to CSV bytes
|
|
csv_bytes = self.df_util.df_to_csv_bytes(df)
|
|
|
|
return csv_bytes |