From 511b070cbbcd29b4e784e9a09d58481e50e6e82f Mon Sep 17 00:00:00 2001 From: zman Date: Fri, 7 Feb 2025 13:52:28 -0500 Subject: [PATCH] pricey worky --- db/database.py | 37 ++++--- dependencies.py | 15 ++- main.py | 13 +-- services/pricing.py | 88 ++++++++++++++++- services/task.py | 33 ++----- services/tcgplayer.py | 220 +----------------------------------------- 6 files changed, 134 insertions(+), 272 deletions(-) diff --git a/db/database.py b/db/database.py index d03689b..d0b2884 100644 --- a/db/database.py +++ b/db/database.py @@ -5,8 +5,10 @@ from typing import Generator import os from sqlalchemy import inspect from services.tcgplayer import TCGPlayerService -#from services.pricing import PricingService +from services.pricing import PricingService from services.file import FileService +from db.models import Price +from datetime import datetime import logging @@ -48,6 +50,17 @@ def get_db() -> Generator[Session, None, None]: with get_db_session() as session: yield session +def prepopulate_data(db: Session, db_exist: bool = False) -> None: + file_service = FileService(db) + tcgplayer_service = TCGPlayerService(db, file_service) + pricing_service = PricingService(db, file_service, tcgplayer_service) + if not db_exist: + tcgplayer_service.populate_tcgplayer_groups() + file = tcgplayer_service.load_tcgplayer_cards() + pricing_service.cron_load_prices(file) + else: + pricing_service.cron_load_prices() + def init_db() -> None: """Initialize database tables and run first-time setup if needed""" from .models import Base @@ -57,22 +70,24 @@ def init_db() -> None: table in inspector.get_table_names() for table in Base.metadata.tables.keys() ) - # if tables_exist: - # drop all tables except file - # for table in inspector.get_table_names(): - # if table != 'files': - # Base.metadata.drop_all(bind=engine, tables=[Base.metadata.tables[table]]) - # logger.info(f"Dropped table: {table}") + if tables_exist: + with get_db_session() as db: + # get date created of latest pricing record + latest_price = db.query(Price).order_by(Price.date_created.desc()).first() + if latest_price: + # check if it is greater than 1.5 hours old + if (datetime.now() - latest_price.date_created).total_seconds() > 5400: + prepopulate_data(db, db_exist=True) + else: + prepopulate_data(db, db_exist=True) # Create tables if they don't exist Base.metadata.create_all(bind=engine) # Run first-time setup only if tables were just created if not tables_exist: -# with get_db_session() as session: -# tcgplayer_service = TCGPlayerService(session, PricingService(session), FileService(session)) -# tcgplayer_service.populate_tcgplayer_groups() -# tcgplayer_service.cron_load_prices() + with get_db_session() as db: + prepopulate_data(db) logger.info("First-time database setup completed") logger.info("Database initialization completed") diff --git a/dependencies.py b/dependencies.py index 98d6ec5..4d1b7ea 100644 --- a/dependencies.py +++ b/dependencies.py @@ -22,10 +22,6 @@ def get_file_service(db: DB) -> FileService: """FileService with only database dependency""" return FileService(db) -def get_pricing_service(db: DB) -> PricingService: - """PricingService with only database dependency""" - return PricingService(db) - def get_storage_service(db: DB) -> StorageService: """StorageService with only database dependency""" return StorageService(db) @@ -37,11 +33,14 @@ def get_inventory_service(db: DB) -> InventoryService: # Services with dependencies on other services def get_tcgplayer_service( db: DB, - pricing_service: Annotated[PricingService, Depends(get_pricing_service)], file_service: Annotated[FileService, Depends(get_file_service)] ) -> TCGPlayerService: """TCGPlayerService depends on PricingService""" - return TCGPlayerService(db, pricing_service, file_service) + return TCGPlayerService(db, file_service) + +def get_pricing_service(db: DB, file_service: Annotated[FileService, Depends(get_file_service)], tcgplayer_service: Annotated[TCGPlayerService, Depends(get_tcgplayer_service)]) -> PricingService: + """PricingService with only database dependency""" + return PricingService(db, file_service, tcgplayer_service) def get_product_service( db: DB, @@ -62,10 +61,10 @@ def get_box_service( def get_task_service( db: DB, product_service: Annotated[ProductService, Depends(get_product_service)], - tcgplayer_service: Annotated[TCGPlayerService, Depends(get_tcgplayer_service)] + pricing_service: Annotated[PricingService, Depends(get_pricing_service)] ) -> TaskService: """TaskService depends on ProductService and TCGPlayerService""" - return TaskService(db, product_service, tcgplayer_service) + return TaskService(db, product_service, pricing_service) # Form data dependencies def get_create_file_metadata( diff --git a/main.py b/main.py index 0911fa1..05dfdd2 100644 --- a/main.py +++ b/main.py @@ -65,22 +65,13 @@ async def startup_event(): db = next(get_db()) # Use dependency injection to get services - - pricing_service = get_pricing_service(db) file_service = get_file_service(db) storage_service = get_storage_service(db) - tcgplayer_service = get_tcgplayer_service(db, pricing_service, file_service) + tcgplayer_service = get_tcgplayer_service(db, file_service) + pricing_service = get_pricing_service(db, file_service, tcgplayer_service) product_service = get_product_service(db, file_service, tcgplayer_service, storage_service) task_service = get_task_service(db, product_service, tcgplayer_service) - # Initialize TCGPlayer groups if needed - if db.query(TCGPlayerGroups).count() == 0: - with db_transaction(db): - tcgplayer_service.populate_tcgplayer_groups() - - # DEBUG - tcgplayer_service.cron_load_prices() - # Start task service await task_service.start() diff --git a/services/pricing.py b/services/pricing.py index ebce2c2..7ecae0a 100644 --- a/services/pricing.py +++ b/services/pricing.py @@ -1,6 +1,92 @@ from sqlalchemy.orm import Session +from db.models import File, CardTCGPlayer, Price +from services.util._dataframe import TCGPlayerPricingRow, DataframeUtil +from services.file import FileService +from services.tcgplayer import TCGPlayerService +from uuid import uuid4 +from db.utils import db_transaction + class PricingService: - def __init__(self, db: Session): + def __init__(self, db: Session, file_service: FileService, tcgplayer_service: TCGPlayerService): self.db = db + self.file_service = file_service + self.tcgplayer_service = tcgplayer_service + self.df_util = DataframeUtil() + + # function for taking a tcgplayer pricing export with all set ids and loading it into the price table + # can be run as needed or scheduled + def get_pricing_export_content(self, file: File = None) -> bytes: + if file: + file_content = self.file_service.get_file_content(file.id) + else: + file = self.tcgplayer_service.get_pricing_export_for_all_products() + file_content = self.file_service.get_file_content(file.id) + return file_content + + def load_pricing_csv_content_to_db(self, file_content: bytes): + try: + if not file_content: + raise ValueError("No file content provided") + + price_types = { + "tcg_market_price": "tcg_market_price", + "tcg_direct_low": "tcg_direct_low", + "tcg_low_price_with_shipping": "tcg_low_price_with_shipping", + "tcg_low_price": "tcg_low_price", + "tcg_marketplace_price": "listed_price" + } + + required_columns = ["tcgplayer_id"] + list(price_types.keys()) + df = self.df_util.csv_bytes_to_df(file_content) + + # Validate columns + missing_columns = set(required_columns) - set(df.columns) + if missing_columns: + raise ValueError(f"Missing required columns: {missing_columns}") + + # Process in true batches + for i in range(0, len(df), 1000): + batch = df.iloc[i:i+1000] + pricing_rows = [TCGPlayerPricingRow(row) for _, row in batch.iterrows()] + + # Query cards for this batch only + tcgplayer_ids = [row.tcgplayer_id for row in pricing_rows] + batch_cards = self.db.query(CardTCGPlayer).filter( + CardTCGPlayer.tcgplayer_id.in_(tcgplayer_ids) + ).all() + + existing_cards = {card.tcgplayer_id: card for card in batch_cards} + + new_prices = [] + for row in pricing_rows: + if row.tcgplayer_id not in existing_cards: + continue + + card = existing_cards[row.tcgplayer_id] + row_prices = [ + Price( + id=str(uuid4()), + product_id=card.product_id, + marketplace_id=None, + type=price_type, # Added missing price_type + price=getattr(row, col_name) + ) + for col_name, price_type in price_types.items() + if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0 + ] + new_prices.extend(row_prices) + + # Save each batch separately + if new_prices: + with db_transaction(self.db): + self.db.bulk_save_objects(new_prices) + + except Exception as e: + raise e # Consider adding logging here + + + def cron_load_prices(self, file: File = None): + file_content = self.get_pricing_export_content(file) + self.load_pricing_csv_content_to_db(file_content) \ No newline at end of file diff --git a/services/task.py b/services/task.py index 20c3c69..4db4f12 100644 --- a/services/task.py +++ b/services/task.py @@ -3,18 +3,18 @@ import logging from typing import Dict, Callable from sqlalchemy.orm import Session from services.product import ProductService -from services.tcgplayer import TCGPlayerService from db.models import File +from services.pricing import PricingService class TaskService: - def __init__(self, db: Session, product_service: ProductService, tcgplayer_service: TCGPlayerService): + def __init__(self, db: Session, product_service: ProductService, pricing_service: PricingService): self.scheduler = BackgroundScheduler() self.logger = logging.getLogger(__name__) self.tasks: Dict[str, Callable] = {} self.db = db self.product_service = product_service - self.tcgplayer_service = tcgplayer_service + self.tcgplayer_service = pricing_service async def start(self): self.scheduler.start() @@ -22,28 +22,13 @@ class TaskService: self.register_scheduled_tasks() def register_scheduled_tasks(self): - self.scheduler.add_job( - self.daily_report, - 'cron', - hour=0, - minute=0, - id='daily_report' - ) - -# self.scheduler.add_job( -# self.pricing_update, -# 'cron', -# minute=41, -# id='pricing_update' -# ) - - def daily_report(self): # Removed async - self.logger.info("Generating daily report") - # Daily report logic + self.scheduler.add_job(self.hourly_pricing, 'cron', minute='0') + self.logger.info("Scheduled tasks registered.") - def pricing_update(self): # Removed async - self.logger.info("Hourly pricing update") - self.tcgplayer_service.cron_load_prices() + def hourly_pricing(self): + self.logger.info("Running hourly pricing task") + self.pricing_service.cron_load_prices() + self.logger.info("Finished hourly pricing task") async def process_manabox_file(self, file: File): self.logger.info("Processing ManaBox file") diff --git a/services/tcgplayer.py b/services/tcgplayer.py index 19a3de1..f5ddcb8 100644 --- a/services/tcgplayer.py +++ b/services/tcgplayer.py @@ -18,7 +18,6 @@ import time import csv from typing import List, Dict, Optional from io import StringIO, BytesIO -from services.pricing import PricingService from sqlalchemy.sql import exists import pandas as pd from sqlalchemy.exc import SQLAlchemyError @@ -45,7 +44,6 @@ class TCGPlayerConfig: class TCGPlayerService: def __init__(self, db: Session, - pricing_service: PricingService, file_service: FileService, config: TCGPlayerConfig=TCGPlayerConfig(), browser_type: Browser=Browser.BRAVE): @@ -54,7 +52,6 @@ class TCGPlayerService: self.browser_type = browser_type self.cookies = None self.previous_request_time = None - self.pricing_service = pricing_service self.df_util = DataframeUtil() self.file_service = file_service @@ -317,166 +314,6 @@ class TCGPlayerService: else: return response.content - def _update_tcgplayer_products(self): - pass - - def update_pricing(self, set_name_ids: Dict[str, List[str]]) -> Dict: - export_id = str(uuid()) - product_fields = { - 'TCGplayer Id': 'tcgplayer_id', - 'group_id': 'group_id', - 'Product Line': 'product_line', - 'Set Name': 'set_name', - 'Product Name': 'product_name', - 'Title': 'title', - 'Number': 'number', - 'Rarity': 'rarity', - 'Condition': 'condition' - } - pricing_fields = { - 'TCGplayer Id': 'tcgplayer_id', - 'tcgplayer_product_id': 'tcgplayer_product_id', - 'export_id': 'export_id', - 'group_id': 'group_id', - 'TCG Market Price': 'tcg_market_price', - 'TCG Direct Low': 'tcg_direct_low', - 'TCG Low Price With Shipping': 'tcg_low_price_with_shipping', - 'TCG Low Price': 'tcg_low_price', - 'TCG Marketplace Price': 'tcg_marketplace_price' - } - - for set_name_id in set_name_ids['set_name_ids']: - export_csv = self._get_export_csv([set_name_id]) - for item in export_csv: - item['export_id'] = export_id - item['group_id'] = set_name_id - # check if product already exists - product_exists = self.db.query(TCGPlayerProduct).filter_by(tcgplayer_id=item['TCGplayer Id']).first() - if product_exists: - item['tcgplayer_product_id'] = product_exists.id - else: - with db_transaction(self.db): - product = TCGPlayerProduct( - id=str(uuid()), - **{db_field: item.get(csv_field) - for csv_field, db_field in product_fields.items()} - ) - self.db.add(product) - item['tcgplayer_product_id'] = product.id - - with db_transaction(self.db): - ph_item = TCGPlayerPricingHistory( - id=str(uuid()), - **{db_field: item.get(csv_field) - for csv_field, db_field in pricing_fields.items()} - ) - self.db.add(ph_item) - - - with db_transaction(self.db): - export_history = TCGPlayerExportHistory( - id=str(uuid()), - type='pricing', - pricing_export_id=export_id - ) - self.db.add(export_history) - - return {"message": "Pricing updated successfully"} - - def update_pricing_all(self) -> Dict: - set_name_ids = self.db.query(TCGPlayerGroups.group_id).all() - set_name_ids = [str(group_id) for group_id, in set_name_ids] - return self.update_pricing({'set_name_ids': set_name_ids}) - - def update_pricing_for_existing_product_groups(self) -> Dict: - set_name_ids = self.db.query(TCGPlayerProduct.group_id).distinct().all() - set_name_ids = [str(group_id) for group_id, in set_name_ids] - return self.update_pricing({'set_name_ids': set_name_ids}) - - def tcg_set_tcg_inventory_product_relationship(self, export_id: str) -> None: - inventory_without_product = ( - self.db.query(TCGPlayerInventory.tcgplayer_id, TCGPlayerInventory.set_name) - .filter(TCGPlayerInventory.total_quantity > 0) - .filter(TCGPlayerInventory.product_line == "Magic") - .filter(TCGPlayerInventory.export_id == export_id) - .filter(TCGPlayerInventory.tcgplayer_product_id.is_(None)) - .filter(~exists().where( - TCGPlayerProduct.id == TCGPlayerInventory.tcgplayer_product_id - )) - .all() - ) - - set_names = list(set(inv.set_name for inv in inventory_without_product - if inv.set_name is not None and isinstance(inv.set_name, str))) - - group_ids = self.db.query(TCGPlayerGroups.group_id).filter( - TCGPlayerGroups.name.in_(set_names) - ).all() - - group_ids = [str(group_id[0]) for group_id in group_ids] - - self.update_pricing(set_name_ids={"set_name_ids": group_ids}) - - for inventory in inventory_without_product: - product = self.db.query(TCGPlayerProduct).filter( - TCGPlayerProduct.tcgplayer_id == inventory.tcgplayer_id - ).first() - - if product: - with db_transaction(self.db): - inventory_record = self.db.query(TCGPlayerInventory).filter( - TCGPlayerInventory.tcgplayer_id == inventory.tcgplayer_id, - TCGPlayerInventory.export_id == export_id - ).first() - - if inventory_record: - inventory_record.tcgplayer_product_id = product.id - self.db.add(inventory_record) - - - def get_live_inventory_pricing_update_csv(self): - export_id = self.update_inventory("live")['export_id'] - self.tcg_set_tcg_inventory_product_relationship(export_id) - self.update_pricing_for_existing_product_groups() - # update_csv = self.pricing_service.create_live_inventory_pricing_update_csv() - update_csv = None - return update_csv - - def get_group_ids_for_box(self, box_id: str) -> List[str]: - # use manabox_export_data.box_id and tcgplayer_product.group_id to filter - # use manabox_tcgplayer_mapping.manabox_id and manabox_tcgplayer_mapping.tcgplayer_id to join - group_ids = self.db.query(ManaboxExportData.box_id, TCGPlayerProduct.group_id).join( - ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id - ).join( - TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id - ).filter(ManaboxExportData.box_id == box_id).all() - group_ids = list(set(str(group_id) for box_id, group_id in group_ids)) - return group_ids - - def get_group_ids_for_upload(self, upload_id: str) -> List[str]: - group_ids = self.db.query(ManaboxExportData.upload_id, TCGPlayerProduct.group_id).join( - ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id - ).join( - TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id - ).filter(ManaboxExportData.upload_id == upload_id).all() - group_ids = list(set(str(group_id) for upload_id, group_id in group_ids)) - return group_ids - - - def add_to_tcgplayer(self, box_id: str = None, upload_id: str = None) : - if box_id and upload_id: - raise ValueError("Cannot provide both box_id and upload_id") - elif box_id: - group_ids = self.get_group_ids_for_box(box_id) - elif upload_id: - group_ids = self.get_group_ids_for_upload(upload_id) - else: - raise ValueError("Must provide either box_id or upload_id") - self.update_pricing({'set_name_ids': group_ids}) - # add_csv = self.pricing_service.create_add_to_tcgplayer_csv(box_id) - add_csv = None - return add_csv - def create_tcgplayer_card(self, row: TCGPlayerPricingRow, group_id: int): # if card already exists, return none card_exists = self.db.query(CardTCGPlayer).filter( @@ -528,15 +365,6 @@ class TCGPlayerService: new_products = [] new_cards = [] new_tcgcards = [] - # new_prices = [] - - # price_types = { - # 'tcg_market_price': 'tcg_market_price', - # 'tcg_direct_low': 'tcg_direct_low', - # 'tcg_low_price_with_shipping': 'tcg_low_price_with_shipping', - # 'tcg_low_price': 'tcg_low_price', - # 'tcg_marketplace_price': 'tcg_marketplace_price' - #} for row in rows: # Get the correct group_id for this row's set @@ -718,7 +546,6 @@ class TCGPlayerService: # logger.error(f"No matching TCGPlayer product with correct rarity found for card {card.name} {card.rarity} {group_id} ({card.set_name} {card.collector_number})") # return None - def get_pricing_export_for_all_products(self) -> File: """ """ @@ -741,46 +568,7 @@ class TCGPlayerService: except SQLAlchemyError as e: raise RuntimeError(f"Failed to retrieve group IDs: {str(e)}") - def pricing_export_to_df(self, export_csv: bytes) -> pd.DataFrame: - """ - Converts raw CSV pricing data to a pandas DataFrame. - - Args: - export_csv (bytes): Raw CSV data in bytes format - - Returns: - pd.DataFrame: Processed pricing data - - Raises: - ValueError: If no CSV data is provided or if CSV parsing fails - """ - if not export_csv: - raise ValueError("No export CSV provided") - - csv_file = None - try: - text_content = export_csv.decode('utf-8') - csv_file = StringIO(text_content) - df = pd.read_csv(csv_file) - - if df.empty: - raise ValueError("CSV data is empty") - - return df - except UnicodeDecodeError as e: - raise ValueError(f"Failed to decode CSV data: {str(e)}") - except pd.errors.EmptyDataError: - raise ValueError("CSV file is empty or malformed") - finally: - if csv_file: - csv_file.close() - - def cron_load_prices(self) -> None: - """ - Scheduled task to load and update product prices. - Uses optimized bulk processing for better performance. - """ - logger.debug("Running cron_load_prices...") + def load_tcgplayer_cards(self) -> File: try: # Get pricing export export_csv_file = self.get_pricing_export_for_all_products() @@ -788,10 +576,8 @@ class TCGPlayerService: # load to card tcgplayer self.load_export_csv_to_card_tcgplayer(export_csv, export_csv_file.id) - - # Process the export with optimized bulk operations - # the pricing service proves that there is no god - # self.pricing_service.process_pricing_export(export_csv) + + return export_csv_file except Exception as e: logger.error(f"Failed to load prices: {e}")