from app.db.models import TCGPlayerGroups, CardTCGPlayer, Product, Card, File, Inventory, OpenBox, OpenBoxCard import requests from app.services.util._dataframe import TCGPlayerPricingRow, DataframeUtil, ManaboxRow from app.services.file import FileService from app.services.inventory import InventoryService from sqlalchemy.orm import Session from app.db.utils import db_transaction from uuid import uuid4 as uuid import browser_cookie3 import webbrowser from typing import Optional, Dict ,List from enum import Enum import logging from dataclasses import dataclass import urllib.parse import json from datetime import datetime import time from typing import List, Dict, Optional import pandas as pd from sqlalchemy.exc import SQLAlchemyError from app.schemas.file import CreateFileRequest import os from app.services.util._docker import DockerUtil from sqlalchemy import func logger = logging.getLogger(__name__) class Browser(Enum): """Supported browser types for cookie extraction""" BRAVE = "brave" CHROME = "chrome" FIREFOX = "firefox" @dataclass class TCGPlayerConfig: """Configuration for TCGPlayer API interactions""" tcgplayer_base_url: str = "https://store.tcgplayer.com" tcgplayer_login_path: str = "/oauth/login" staged_inventory_download_path: str = "/Admin/Pricing/DownloadStagedInventoryExportCSV?type=Pricing" live_inventory_download_path = "/Admin/Pricing/DownloadMyExportCSV?type=Pricing" pricing_export_path: str = "/admin/pricing/downloadexportcsv" max_retries: int = 1 class TCGPlayerService: def __init__(self, db: Session, file_service: FileService, config: TCGPlayerConfig=TCGPlayerConfig(), browser_type: Browser=Browser.BRAVE): self.db = db self.config = config self.browser_type = browser_type self.cookies = None self.previous_request_time = None self.df_util = DataframeUtil() self.file_service = file_service self.docker_util = DockerUtil() def _insert_groups(self, groups): for group in groups: db_group = TCGPlayerGroups( id=str(uuid()), group_id=group['groupId'], name=group['name'], abbreviation=group['abbreviation'], is_supplemental=group['isSupplemental'], published_on=group['publishedOn'], modified_on=group['modifiedOn'], category_id=group['categoryId'] ) self.db.add(db_group) def populate_tcgplayer_groups(self): group_endpoint = "https://tcgcsv.com/tcgplayer/1/groups" response = requests.get(group_endpoint) response.raise_for_status() groups = response.json()['results'] # manually add broken groups manual_groups = [ { "groupId": 2422, "name": "Modern Horizons 2 Timeshifts", "abbreviation": "H2R", "isSupplemental": "0", "publishedOn": "2018-11-08T00:00:00", "modifiedOn": "2018-11-08T00:00:00", "categoryId": 1 }, { "groupId": 52, "name": "Store Championships", "abbreviation": "SCH", "isSupplemental": "1", "publishedOn": "2007-07-14T00:00:00", "modifiedOn": "2007-07-14T00:00:00", "categoryId": 1 } ] groups.extend(manual_groups) # Insert groups into db with db_transaction(self.db): self._insert_groups(groups) def get_cookies_from_file(self) -> Dict: # check if cookies file exists if not os.path.exists('cookies/tcg_cookies.json'): raise ValueError("Cookies file not found") with open('cookies/tcg_cookies.json', 'r') as f: logger.debug("Loading cookies from file") cookies = json.load(f) return cookies def _get_browser_cookies(self) -> Optional[Dict]: """Retrieve cookies from the specified browser""" try: cookie_getter = getattr(browser_cookie3, self.browser_type.value, None) if not cookie_getter: raise ValueError(f"Unsupported browser type: {self.browser_type.value}") return cookie_getter() except Exception as e: logger.error(f"Failed to get browser cookies: {str(e)}") return None def _send_request(self, url: str, method: str, data=None, except_302=False) -> requests.Response: """Send a request with the specified cookies""" # Rate limiting logic if self.previous_request_time: time_diff = (datetime.now() - self.previous_request_time).total_seconds() if time_diff < 10: logger.info(f"Waiting 10 seconds before next request...") time.sleep(10 - time_diff) headers = self._set_headers(method) # Move cookie initialization outside and make it more explicit if not self.cookies: if self.docker_util.is_in_docker(): logger.debug("Running in Docker - using cookies from file") self.cookies = self.get_cookies_from_file() else: logger.debug("Not in Docker - using browser cookies") self.cookies = self._get_browser_cookies() if not self.cookies: raise ValueError("Failed to retrieve cookies") try: #logger.info(f"debug: request url {url}, method {method}, data {data}") response = requests.request(method, url, headers=headers, cookies=self.cookies, data=data) response.raise_for_status() if response.status_code == 302 and not except_302: logger.warning("Redirecting to login page...") self._refresh_authentication() return self._send_request(url, method, except_302=True) elif response.status_code == 302 and except_302: raise ValueError("Redirected to login page after authentication refresh") self.previous_request_time = datetime.now() return response except requests.RequestException as e: logger.error(f"Request failed: {str(e)}") return None def _set_headers(self, method: str) -> Dict: base_headers = { 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', 'accept-language': 'en-US,en;q=0.8', 'priority': 'u=0, i', 'referer': 'https://store.tcgplayer.com/admin/pricing', 'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'same-origin', 'sec-fetch-user': '?1', 'sec-gpc': '1', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36' } if method == 'POST': post_headers = { 'cache-control': 'max-age=0', 'content-type': 'application/x-www-form-urlencoded', 'origin': 'https://store.tcgplayer.com' } base_headers.update(post_headers) return base_headers def _set_pricing_export_payload(self, set_name_ids: List[str]) -> Dict: data = { "PricingType": "Pricing", "CategoryId": "1", "SetNameIds": set_name_ids, "ConditionIds": ["1"], "RarityIds": ["0"], "LanguageIds": ["1"], "PrintingIds": ["0"], "CompareAgainstPrice": False, "PriceToCompare": 3, "ValueToCompare": 1, "PriceValueToCompare": None, "MyInventory": False, "ExcludeListos": False, "ExportLowestListingNotMe": False } payload = "model=" + urllib.parse.quote(json.dumps(data)) return payload def _refresh_authentication(self) -> None: """Open browser for user to refresh authentication""" login_url = f"{self.config.tcgplayer_base_url}{self.config.tcgplayer_login_path}" logger.info("Opening browser for authentication refresh...") webbrowser.open(login_url) input('Please login and press Enter to continue...') # Clear existing cookies to force refresh self.cookies = None def get_inventory_df(self, version: str) -> pd.DataFrame: if version == 'staged': inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.staged_inventory_download_path}" elif version == 'live': inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.live_inventory_download_path}" else: raise ValueError("Invalid inventory version") response = self._send_request(inventory_download_url, 'GET') df = self.df_util.csv_bytes_to_df(response.content) return df def _get_export_csv(self, set_name_ids: List[str]) -> bytes: """ Download export CSV and save to specified path Returns True if successful, False otherwise """ logger.info(f"Downloading pricing export from tcgplayer with ids {set_name_ids}") payload = self._set_pricing_export_payload(set_name_ids) export_csv_download_url = f"{self.config.tcgplayer_base_url}{self.config.pricing_export_path}" response = self._send_request(export_csv_download_url, method='POST', data=payload) return response.content def create_tcgplayer_card(self, row: TCGPlayerPricingRow, group_id: int): # if card already exists, return none card_exists = self.db.query(CardTCGPlayer).filter( CardTCGPlayer.tcgplayer_id == row.tcgplayer_id, CardTCGPlayer.group_id == group_id ).first() if card_exists: return card_exists # create product product = Product( id=str(uuid()), type = 'card', product_line = 'mtg' ) # create card card = Card( product_id=product.id, ) # create Cardtcgplayer tcgcard = CardTCGPlayer( product_id=product.id, group_id=group_id, tcgplayer_id=row.tcgplayer_id, product_line=row.product_line, set_name=row.set_name, product_name=row.product_name, title=row.title, number=row.number, rarity=row.rarity, condition=row.condition ) with db_transaction(self.db): self.db.add(product) self.db.add(card) self.db.add(tcgcard) return tcgcard def create_tcgplayer_cards_batch(self, rows: list[TCGPlayerPricingRow], set_to_group: dict) -> list[CardTCGPlayer]: # Get existing cards in a single query existing_cards = { (card.tcgplayer_id, card.group_id): card for card in self.db.query(CardTCGPlayer).filter( CardTCGPlayer.tcgplayer_id.in_([row.tcgplayer_id for row in rows]), CardTCGPlayer.group_id.in_([set_to_group[row.set_name] for row in rows]) ).all() } # Pre-allocate lists for better memory efficiency new_products = [] new_cards = [] new_tcgcards = [] for row in rows: # Get the correct group_id for this row's set group_id = set_to_group[row.set_name] if (row.tcgplayer_id, group_id) in existing_cards: continue product_id = str(uuid()) new_products.append(Product( id=product_id, type='card', product_line='mtg' )) new_cards.append(Card( product_id=product_id, )) new_tcgcards.append(CardTCGPlayer( product_id=product_id, group_id=group_id, # Use the correct group_id for this specific row tcgplayer_id=row.tcgplayer_id, product_line=row.product_line, set_name=row.set_name, product_name=row.product_name, title=row.title, number=row.number, rarity=row.rarity, condition=row.condition )) # Batch create price objects # row_prices = [ # Price( # id=str(uuid()), # product_id=product_id, # marketplace_id=None, # type=price_type, # price=getattr(row, col_name) # ) # for col_name, price_type in price_types.items() # if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0 # ] # new_prices.extend(row_prices) if new_products: with db_transaction(self.db): self.db.bulk_save_objects(new_products) self.db.bulk_save_objects(new_cards) self.db.bulk_save_objects(new_tcgcards) # if new_prices: # self.db.bulk_save_objects(new_prices) return new_tcgcards def load_export_csv_to_card_tcgplayer(self, export_csv: bytes, file_id: str = None, batch_size: int = 1000) -> None: try: if not export_csv: raise ValueError("No export CSV provided") df = self.df_util.csv_bytes_to_df(export_csv) logger.debug(f"Loaded {len(df)} rows from export CSV") # Get all group_ids upfront in a single query set_to_group = dict( self.db.query(TCGPlayerGroups.name, TCGPlayerGroups.group_id).all() ) # Process in batches for i in range(0, len(df), batch_size): batch_df = df.iloc[i:i + batch_size] batch_rows = [TCGPlayerPricingRow(row) for _, row in batch_df.iterrows()] # Filter rows with valid group_ids valid_rows = [ row for row in batch_rows if row.set_name in set_to_group ] # logger.debug(f"Processing batch {i // batch_size + 1}: {len(valid_rows)} valid rows") if valid_rows: # Pass the entire set_to_group mapping self.create_tcgplayer_cards_batch(valid_rows, set_to_group) except Exception as e: logger.error(f"Failed to load export CSV: {e}") # set file upload to failed if file_id: with db_transaction(self.db): file = self.db.query(File).filter(File.id == file_id).first() if file: file.status = 'failed' self.db.add(file) raise finally: if file_id: with db_transaction(self.db): file = self.db.query(File).filter(File.id == file_id).first() if file: file.status = 'completed' self.db.add(file) def get_card_tcgplayer_from_manabox_row(self, card: ManaboxRow, group_id: int) -> CardTCGPlayer: # Expanded rarity mapping mb_to_tcg_rarity_mapping = { "common": "C", "uncommon": "U", "rare": "R", "mythic": "M", "special": "S" } # Mapping from Manabox condition+foil to TCGPlayer condition mb_to_tcg_condition_mapping = { ("near_mint", "foil"): "Near Mint Foil", ("near_mint", "normal"): "Near Mint", ("near_mint", "etched"): "Near Mint Foil" } # Get TCGPlayer condition from Manabox condition+foil combination tcg_condition = mb_to_tcg_condition_mapping.get((card.condition, card.foil)) if tcg_condition is None: logger.error(f"Unsupported condition/foil combination: {card.condition}, {card.foil}") logger.error(f"Card details: name={card.name}, set_name={card.set_name}, collector_number={card.collector_number}") return None # Get TCGPlayer rarity from Manabox rarity tcg_rarity = mb_to_tcg_rarity_mapping.get(card.rarity) if tcg_rarity is None: logger.error(f"Unsupported rarity: {card.rarity}") logger.error(f"Card details: name={card.name}, set_name={card.set_name}, collector_number={card.collector_number}") return None # First query for matching products without rarity filter # debug # log everything in this query # remove letters from card.collector_number FOR JOIN ONLY join_collector_number = ''.join(filter(str.isdigit, card.collector_number)) # logger.debug(f"Querying for card: {card.name}, {card.set_code}, {card.collector_number}, {tcg_condition}, {group_id}") base_query = self.db.query(CardTCGPlayer).filter( CardTCGPlayer.number == join_collector_number, CardTCGPlayer.condition == tcg_condition, CardTCGPlayer.group_id == group_id, CardTCGPlayer.rarity != "T" # TOKENS ARE NOT SUPPORTED CUZ BROKE LOL ) # logger.debug(f"Base query: {base_query.statement.compile(compile_kwargs={'literal_binds': True})}") # Get all potential matches products = base_query.all() # If no products found, return None if not products: logger.error(f"No matching TCGPlayer product found for card {card.name} ({card.set_code} {card.collector_number})") return None # Look for an exact match including rarity, unless the TCGPlayer product is a land for product in products: if product.rarity == "L" or product.rarity == tcg_rarity: return product # ignore rarity, just make sure only one product is returned if len(products) > 1: # try to match on name before failing for product in products: if product.product_name == card.name: return product elif len(products) == 1: return products[0] logger.error(f"Multiple matching TCGPlayer products found for card {card.name} ({card.set_code} {card.collector_number})") return None # If we got here, we found products but none matched our rarity criteria # logger.error(f"No matching TCGPlayer product with correct rarity found for card {card.name} {card.rarity} {group_id} ({card.set_name} {card.collector_number})") # return None def get_pricing_export_for_all_products(self) -> File: """ """ DEBUG = False if DEBUG: logger.debug("DEBUG: Using existing pricing export file") file = self.db.query(File).filter(File.type == 'tcgplayer_pricing_export').first() if file: return file try: all_group_ids = self.db.query(TCGPlayerGroups.group_id).all() all_group_ids = [str(group_id) for group_id, in all_group_ids] export_csv = self._get_export_csv(all_group_ids) export_csv_file = self.file_service.create_file(export_csv, CreateFileRequest( source="tcgplayer", type="tcgplayer_pricing_export", filename="tcgplayer_pricing_export.csv" )) return export_csv_file except SQLAlchemyError as e: raise RuntimeError(f"Failed to retrieve group IDs: {str(e)}") def load_tcgplayer_cards(self, file_content): try: # load to card tcgplayer self.load_export_csv_to_card_tcgplayer(file_content) except Exception as e: logger.error(f"Failed to load prices: {e}") raise def open_box_cards_to_tcgplayer_inventory_df(self, open_box_ids: List[str]) -> pd.DataFrame: # Using sqlalchemy to group and sum quantities for duplicate TCGplayer IDs tcgcards = (self.db.query( CardTCGPlayer.product_id, CardTCGPlayer.tcgplayer_id, CardTCGPlayer.product_line, CardTCGPlayer.set_name, CardTCGPlayer.product_name, CardTCGPlayer.title, CardTCGPlayer.number, CardTCGPlayer.rarity, CardTCGPlayer.condition, func.sum(OpenBoxCard.quantity).label('quantity') ) .filter(OpenBoxCard.open_box_id.in_(open_box_ids)) .join(CardTCGPlayer, OpenBoxCard.card_id == CardTCGPlayer.product_id) .group_by( CardTCGPlayer.tcgplayer_id, CardTCGPlayer.product_id, CardTCGPlayer.product_line, CardTCGPlayer.set_name, CardTCGPlayer.product_name, CardTCGPlayer.title, CardTCGPlayer.number, CardTCGPlayer.rarity, CardTCGPlayer.condition ) .all()) if not tcgcards: return None # Create dataframe directly from the query results df = pd.DataFrame(tcgcards, columns=['product_id', 'tcgplayer_id', 'product_line', 'set_name', 'product_name', 'title', 'number', 'rarity', 'condition', 'quantity']) # Add empty columns df['Total Quantity'] = '' df['Add to Quantity'] = df['quantity'] df['TCG Marketplace Price'] = '' df['Photo URL'] = '' # Rename columns df = df.rename(columns={ 'tcgplayer_id': 'TCGplayer Id', 'product_line': 'Product Line', 'set_name': 'Set Name', 'product_name': 'Product Name', 'title': 'Title', 'number': 'Number', 'rarity': 'Rarity', 'condition': 'Condition' }) return df