205 lines
9.3 KiB
Python
205 lines
9.3 KiB
Python
import logging
|
|
from typing import Callable
|
|
from db.models import TCGPlayerInventory, TCGPlayerExportHistory, TCGPlayerPricingHistory, ManaboxExportData, ManaboxTCGPlayerMapping, TCGPlayerProduct
|
|
from sqlalchemy.orm import Session
|
|
import pandas as pd
|
|
from db.utils import db_transaction
|
|
from sqlalchemy import func, and_, exists
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class PricingService:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def get_box_with_most_recent_prices(self, box_id: str) -> pd.DataFrame:
|
|
latest_prices = (
|
|
self.db.query(
|
|
TCGPlayerPricingHistory.tcgplayer_product_id,
|
|
func.max(TCGPlayerPricingHistory.date_created).label('max_date')
|
|
)
|
|
.group_by(TCGPlayerPricingHistory.tcgplayer_product_id)
|
|
.subquery('latest') # Added name to subquery
|
|
)
|
|
|
|
result = (
|
|
self.db.query(ManaboxExportData, TCGPlayerPricingHistory, TCGPlayerProduct)
|
|
.join(ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id)
|
|
.join(TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id)
|
|
.join(TCGPlayerPricingHistory, TCGPlayerProduct.id == TCGPlayerPricingHistory.tcgplayer_product_id)
|
|
.join(
|
|
latest_prices,
|
|
and_(
|
|
TCGPlayerPricingHistory.tcgplayer_product_id == latest_prices.c.tcgplayer_product_id,
|
|
TCGPlayerPricingHistory.date_created == latest_prices.c.max_date
|
|
)
|
|
)
|
|
.filter(ManaboxExportData.box_id == box_id) # Removed str() conversion
|
|
.all()
|
|
)
|
|
|
|
logger.debug(f"Found {len(result)} rows")
|
|
|
|
df = pd.DataFrame([{
|
|
**{f"manabox_{k}": v for k, v in row[0].__dict__.items() if not k.startswith('_')},
|
|
**{f"pricing_{k}": v for k, v in row[1].__dict__.items() if not k.startswith('_')},
|
|
**{f"tcgproduct_{k}": v for k, v in row[2].__dict__.items() if not k.startswith('_')}
|
|
} for row in result])
|
|
|
|
return df
|
|
|
|
def get_live_inventory_with_most_recent_prices(self) -> pd.DataFrame:
|
|
# Get latest export IDs using subqueries
|
|
latest_inventory_export = (
|
|
self.db.query(TCGPlayerExportHistory.inventory_export_id)
|
|
.filter(TCGPlayerExportHistory.type == "live_inventory")
|
|
.order_by(TCGPlayerExportHistory.date_created.desc())
|
|
.limit(1)
|
|
.scalar_subquery()
|
|
)
|
|
# this is bad because latest pricing export is not guaranteed to be related to the latest inventory export
|
|
latest_pricing_export = (
|
|
self.db.query(TCGPlayerExportHistory.pricing_export_id)
|
|
.filter(TCGPlayerExportHistory.type == "pricing")
|
|
.order_by(TCGPlayerExportHistory.date_created.desc())
|
|
.limit(1)
|
|
.scalar_subquery()
|
|
)
|
|
|
|
# Join inventory and pricing data in a single query
|
|
inventory_with_pricing = (
|
|
self.db.query(TCGPlayerInventory, TCGPlayerPricingHistory)
|
|
.join(
|
|
TCGPlayerPricingHistory,
|
|
TCGPlayerInventory.tcgplayer_product_id == TCGPlayerPricingHistory.tcgplayer_product_id
|
|
)
|
|
.filter(
|
|
TCGPlayerInventory.export_id == latest_inventory_export,
|
|
TCGPlayerPricingHistory.export_id == latest_pricing_export
|
|
)
|
|
.all()
|
|
)
|
|
|
|
# Convert to pandas DataFrame
|
|
df = pd.DataFrame([{
|
|
# Inventory columns
|
|
**{f"inventory_{k}": v
|
|
for k, v in row[0].__dict__.items()
|
|
if not k.startswith('_')},
|
|
# Pricing columns
|
|
**{f"pricing_{k}": v
|
|
for k, v in row[1].__dict__.items()
|
|
if not k.startswith('_')}
|
|
} for row in inventory_with_pricing])
|
|
|
|
return df
|
|
|
|
def default_pricing_algo(self, df: pd.DataFrame = None):
|
|
if df is None:
|
|
logger.debug("No DataFrame provided, fetching live inventory with most recent prices")
|
|
df = self.get_live_inventory_with_most_recent_prices()
|
|
# if tcg low price is < 0.35, set my_price to 0.35
|
|
# if either tcg low price or tcg low price with shipping is under 5, set my_price to tcg low price * 1.25
|
|
# if tcg low price with shipping is > 25 set price to tcg low price with shipping * 1.025
|
|
# otherwise, set price to tcg low price with shipping * 1.10
|
|
# also round to 2 decimal places
|
|
df['my_price'] = df.apply(lambda row: round(
|
|
0.35 if row['pricing_tcg_low_price'] < 0.35 else
|
|
row['pricing_tcg_low_price'] * 1.25 if row['pricing_tcg_low_price'] < 5 or row['pricing_tcg_low_price_with_shipping'] < 5 else
|
|
row['pricing_tcg_low_price_with_shipping'] * 1.025 if row['pricing_tcg_low_price_with_shipping'] > 25 else
|
|
row['pricing_tcg_low_price_with_shipping'] * 1.10, 2), axis=1)
|
|
# log rows with no price
|
|
no_price = df[df['my_price'].isnull()]
|
|
if len(no_price) > 0:
|
|
logger.warning(f"Found {len(no_price)} rows with no price")
|
|
logger.warning(no_price)
|
|
# remove rows with no price
|
|
df = df.dropna(subset=['my_price'])
|
|
return df
|
|
|
|
def convert_df_to_csv(self, df: pd.DataFrame):
|
|
# Flip the mapping to be from current names TO desired names
|
|
column_mapping = {
|
|
'inventory_tcgplayer_id': 'TCGplayer Id',
|
|
'inventory_product_line': 'Product Line',
|
|
'inventory_set_name': 'Set Name',
|
|
'inventory_product_name': 'Product Name',
|
|
'inventory_title': 'Title',
|
|
'inventory_number': 'Number',
|
|
'inventory_rarity': 'Rarity',
|
|
'inventory_condition': 'Condition',
|
|
'pricing_tcg_market_price': 'TCG Market Price',
|
|
'pricing_tcg_direct_low': 'TCG Direct Low',
|
|
'pricing_tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
|
|
'pricing_tcg_low_price': 'TCG Low Price',
|
|
'inventory_total_quantity': 'Total Quantity',
|
|
'inventory_add_to_quantity': 'Add to Quantity',
|
|
'my_price': 'TCG Marketplace Price',
|
|
'inventory_photo_url': 'Photo URL'
|
|
}
|
|
|
|
df['pricing_tcg_market_price'] = ""
|
|
df['pricing_tcg_direct_low'] = ""
|
|
df['pricing_tcg_low_price_with_shipping'] = ""
|
|
df['pricing_tcg_low_price'] = ""
|
|
df['inventory_total_quantity'] = ""
|
|
df['inventory_add_to_quantity'] = 0
|
|
df['inventory_photo_url'] = ""
|
|
|
|
# First select the columns we want (using the keys of our mapping)
|
|
# Then rename them to the desired names (the values in our mapping)
|
|
df = df[column_mapping.keys()].rename(columns=column_mapping)
|
|
|
|
return df.to_csv(index=False, quoting=1, quotechar='"')
|
|
|
|
def convert_add_df_to_csv(self, df: pd.DataFrame):
|
|
column_mapping = {
|
|
'tcgproduct_tcgplayer_id': 'TCGplayer Id',
|
|
'tcgproduct_product_line': 'Product Line',
|
|
'tcgproduct_set_name': 'Set Name',
|
|
'tcgproduct_product_name': 'Product Name',
|
|
'tcgproduct_title': 'Title',
|
|
'tcgproduct_number': 'Number',
|
|
'tcgproduct_rarity': 'Rarity',
|
|
'tcgproduct_condition': 'Condition',
|
|
'pricing_tcg_market_price': 'TCG Market Price',
|
|
'pricing_tcg_direct_low': 'TCG Direct Low',
|
|
'pricing_tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
|
|
'pricing_tcg_low_price': 'TCG Low Price',
|
|
'tcgproduct_group_id': 'Total Quantity',
|
|
'manabox_quantity': 'Add to Quantity',
|
|
'my_price': 'TCG Marketplace Price',
|
|
'tcgproduct_photo_url': 'Photo URL'
|
|
}
|
|
df['tcgproduct_group_id'] = ""
|
|
df['pricing_tcg_market_price'] = ""
|
|
df['pricing_tcg_direct_low'] = ""
|
|
df['pricing_tcg_low_price_with_shipping'] = ""
|
|
df['pricing_tcg_low_price'] = ""
|
|
df['tcgproduct_photo_url'] = ""
|
|
|
|
df = df[column_mapping.keys()].rename(columns=column_mapping)
|
|
|
|
return df.to_csv(index=False, quoting=1, quotechar='"')
|
|
|
|
def create_live_inventory_pricing_update_csv(self, algo: Callable = None) -> str:
|
|
actual_algo = algo if algo is not None else self.default_pricing_algo
|
|
df = actual_algo()
|
|
csv = self.convert_df_to_csv(df)
|
|
return csv
|
|
|
|
def create_add_to_tcgplayer_csv(self, box_id: str = None, upload_id: str = None, algo: Callable = None) -> str:
|
|
actual_algo = algo if algo is not None else self.default_pricing_algo
|
|
if box_id and upload_id:
|
|
raise ValueError("Cannot specify both box_id and upload_id")
|
|
elif not box_id and not upload_id:
|
|
raise ValueError("Must specify either box_id or upload_id")
|
|
elif box_id:
|
|
logger.debug("creating df")
|
|
df = self.get_box_with_most_recent_prices(box_id)
|
|
elif upload_id:
|
|
raise NotImplementedError("Not yet implemented")
|
|
df = actual_algo(df)
|
|
csv = self.convert_add_df_to_csv(df)
|
|
return csv |