giga_tcg/app/services/tcgplayer.py
2025-02-07 21:38:09 -05:00

593 lines
24 KiB
Python

from app.db.models import TCGPlayerGroups, CardTCGPlayer, Product, Card, File, Inventory, OpenBox, OpenBoxCard
import requests
from app.services.util._dataframe import TCGPlayerPricingRow, DataframeUtil, ManaboxRow
from app.services.file import FileService
from app.services.inventory import InventoryService
from sqlalchemy.orm import Session
from app.db.utils import db_transaction
from uuid import uuid4 as uuid
import browser_cookie3
import webbrowser
from typing import Optional, Dict ,List
from enum import Enum
import logging
from dataclasses import dataclass
import urllib.parse
import json
from datetime import datetime
import time
from typing import List, Dict, Optional
import pandas as pd
from sqlalchemy.exc import SQLAlchemyError
from app.schemas.file import CreateFileRequest
import os
logger = logging.getLogger(__name__)
class Browser(Enum):
"""Supported browser types for cookie extraction"""
BRAVE = "brave"
CHROME = "chrome"
FIREFOX = "firefox"
@dataclass
class TCGPlayerConfig:
"""Configuration for TCGPlayer API interactions"""
tcgplayer_base_url: str = "https://store.tcgplayer.com"
tcgplayer_login_path: str = "/oauth/login"
staged_inventory_download_path: str = "/Admin/Pricing/DownloadStagedInventoryExportCSV?type=Pricing"
live_inventory_download_path = "/Admin/Pricing/DownloadMyExportCSV?type=Pricing"
pricing_export_path: str = "/admin/pricing/downloadexportcsv"
max_retries: int = 1
class TCGPlayerService:
def __init__(self, db: Session,
file_service: FileService,
config: TCGPlayerConfig=TCGPlayerConfig(),
browser_type: Browser=Browser.BRAVE):
self.db = db
self.config = config
self.browser_type = browser_type
self.cookies = None
self.previous_request_time = None
self.df_util = DataframeUtil()
self.file_service = file_service
def _insert_groups(self, groups):
for group in groups:
db_group = TCGPlayerGroups(
id=str(uuid()),
group_id=group['groupId'],
name=group['name'],
abbreviation=group['abbreviation'],
is_supplemental=group['isSupplemental'],
published_on=group['publishedOn'],
modified_on=group['modifiedOn'],
category_id=group['categoryId']
)
self.db.add(db_group)
def populate_tcgplayer_groups(self):
group_endpoint = "https://tcgcsv.com/tcgplayer/1/groups"
response = requests.get(group_endpoint)
response.raise_for_status()
groups = response.json()['results']
# manually add broken groups
manual_groups = [
{
"groupId": 2422,
"name": "Modern Horizons 2 Timeshifts",
"abbreviation": "H2R",
"isSupplemental": "0",
"publishedOn": "2018-11-08T00:00:00",
"modifiedOn": "2018-11-08T00:00:00",
"categoryId": 1
},
{
"groupId": 52,
"name": "Store Championships",
"abbreviation": "SCH",
"isSupplemental": "1",
"publishedOn": "2007-07-14T00:00:00",
"modifiedOn": "2007-07-14T00:00:00",
"categoryId": 1
}
]
groups.extend(manual_groups)
# Insert groups into db
with db_transaction(self.db):
self._insert_groups(groups)
def get_cookies_from_file(self) -> Dict:
# check if cookies file exists
if not os.path.exists('cookies/tcg_cookies.json'):
raise ValueError("Cookies file not found")
with open('cookies/tcg_cookies.json', 'r') as f:
logger.debug("Loading cookies from file")
cookies = json.load(f)
return cookies
def _get_browser_cookies(self) -> Optional[Dict]:
"""Retrieve cookies from the specified browser"""
try:
cookie_getter = getattr(browser_cookie3, self.browser_type.value, None)
if not cookie_getter:
raise ValueError(f"Unsupported browser type: {self.browser_type.value}")
return cookie_getter()
except Exception as e:
logger.error(f"Failed to get browser cookies: {str(e)}")
return None
def is_in_docker(self) -> bool:
"""Check if we're running inside a Docker container using multiple methods"""
# Method 1: Check cgroup
try:
with open('/proc/1/cgroup', 'r') as f:
content = f.read().lower()
if any(container_id in content for container_id in ['docker', 'containerd', 'kubepods']):
logger.debug("Docker detected via cgroup")
return True
except Exception as e:
logger.debug(f"Could not read cgroup file: {e}")
# Method 2: Check /.dockerenv file
if os.path.exists('/.dockerenv'):
logger.debug("Docker detected via /.dockerenv file")
return True
# Method 3: Check environment variables
docker_env = any(os.environ.get(var, False) for var in [
'DOCKER_CONTAINER',
'IN_DOCKER',
'KUBERNETES_SERVICE_HOST', # For k8s
'DOCKER_HOST'
])
if docker_env:
logger.debug("Docker detected via environment variables")
return True
# Method 4: Check container runtime
try:
with open('/proc/self/mountinfo', 'r') as f:
content = f.read().lower()
if any(rt in content for rt in ['docker', 'containerd', 'kubernetes']):
logger.debug("Docker detected via mountinfo")
return True
except Exception as e:
logger.debug(f"Could not read mountinfo: {e}")
logger.debug("No Docker environment detected")
return False
def _send_request(self, url: str, method: str, data=None, except_302=False) -> requests.Response:
"""Send a request with the specified cookies"""
# Rate limiting logic
if self.previous_request_time:
time_diff = (datetime.now() - self.previous_request_time).total_seconds()
if time_diff < 10:
logger.info(f"Waiting 10 seconds before next request...")
time.sleep(10 - time_diff)
headers = self._set_headers(method)
# Move cookie initialization outside and make it more explicit
if not self.cookies:
if self.is_in_docker():
logger.debug("Running in Docker - using cookies from file")
self.cookies = self.get_cookies_from_file()
else:
logger.debug("Not in Docker - using browser cookies")
self.cookies = self._get_browser_cookies()
if not self.cookies:
raise ValueError("Failed to retrieve cookies")
try:
#logger.info(f"debug: request url {url}, method {method}, data {data}")
response = requests.request(method, url, headers=headers, cookies=self.cookies, data=data)
response.raise_for_status()
if response.status_code == 302 and not except_302:
logger.warning("Redirecting to login page...")
self._refresh_authentication()
return self._send_request(url, method, except_302=True)
elif response.status_code == 302 and except_302:
raise ValueError("Redirected to login page after authentication refresh")
self.previous_request_time = datetime.now()
return response
except requests.RequestException as e:
logger.error(f"Request failed: {str(e)}")
return None
def _set_headers(self, method: str) -> Dict:
base_headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'accept-language': 'en-US,en;q=0.8',
'priority': 'u=0, i',
'referer': 'https://store.tcgplayer.com/admin/pricing',
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'sec-gpc': '1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36'
}
if method == 'POST':
post_headers = {
'cache-control': 'max-age=0',
'content-type': 'application/x-www-form-urlencoded',
'origin': 'https://store.tcgplayer.com'
}
base_headers.update(post_headers)
return base_headers
def _set_pricing_export_payload(self, set_name_ids: List[str]) -> Dict:
data = {
"PricingType": "Pricing",
"CategoryId": "1",
"SetNameIds": set_name_ids,
"ConditionIds": ["1"],
"RarityIds": ["0"],
"LanguageIds": ["1"],
"PrintingIds": ["0"],
"CompareAgainstPrice": False,
"PriceToCompare": 3,
"ValueToCompare": 1,
"PriceValueToCompare": None,
"MyInventory": False,
"ExcludeListos": False,
"ExportLowestListingNotMe": False
}
payload = "model=" + urllib.parse.quote(json.dumps(data))
return payload
def _refresh_authentication(self) -> None:
"""Open browser for user to refresh authentication"""
login_url = f"{self.config.tcgplayer_base_url}{self.config.tcgplayer_login_path}"
logger.info("Opening browser for authentication refresh...")
webbrowser.open(login_url)
input('Please login and press Enter to continue...')
# Clear existing cookies to force refresh
self.cookies = None
def get_inventory_df(self, version: str) -> pd.DataFrame:
if version == 'staged':
inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.staged_inventory_download_path}"
elif version == 'live':
inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.live_inventory_download_path}"
else:
raise ValueError("Invalid inventory version")
response = self._send_request(inventory_download_url, 'GET')
df = self.df_util.csv_bytes_to_df(response.content)
return df
def _get_export_csv(self, set_name_ids: List[str]) -> bytes:
"""
Download export CSV and save to specified path
Returns True if successful, False otherwise
"""
logger.info(f"Downloading pricing export from tcgplayer with ids {set_name_ids}")
payload = self._set_pricing_export_payload(set_name_ids)
export_csv_download_url = f"{self.config.tcgplayer_base_url}{self.config.pricing_export_path}"
response = self._send_request(export_csv_download_url, method='POST', data=payload)
return response.content
def create_tcgplayer_card(self, row: TCGPlayerPricingRow, group_id: int):
# if card already exists, return none
card_exists = self.db.query(CardTCGPlayer).filter(
CardTCGPlayer.tcgplayer_id == row.tcgplayer_id,
CardTCGPlayer.group_id == group_id
).first()
if card_exists:
return card_exists
# create product
product = Product(
id=str(uuid()),
type = 'card',
product_line = 'mtg'
)
# create card
card = Card(
product_id=product.id,
)
# create Cardtcgplayer
tcgcard = CardTCGPlayer(
product_id=product.id,
group_id=group_id,
tcgplayer_id=row.tcgplayer_id,
product_line=row.product_line,
set_name=row.set_name,
product_name=row.product_name,
title=row.title,
number=row.number,
rarity=row.rarity,
condition=row.condition
)
with db_transaction(self.db):
self.db.add(product)
self.db.add(card)
self.db.add(tcgcard)
return tcgcard
def create_tcgplayer_cards_batch(self, rows: list[TCGPlayerPricingRow], set_to_group: dict) -> list[CardTCGPlayer]:
# Get existing cards in a single query
existing_cards = {
(card.tcgplayer_id, card.group_id): card
for card in self.db.query(CardTCGPlayer).filter(
CardTCGPlayer.tcgplayer_id.in_([row.tcgplayer_id for row in rows]),
CardTCGPlayer.group_id.in_([set_to_group[row.set_name] for row in rows])
).all()
}
# Pre-allocate lists for better memory efficiency
new_products = []
new_cards = []
new_tcgcards = []
for row in rows:
# Get the correct group_id for this row's set
group_id = set_to_group[row.set_name]
if (row.tcgplayer_id, group_id) in existing_cards:
continue
product_id = str(uuid())
new_products.append(Product(
id=product_id,
type='card',
product_line='mtg'
))
new_cards.append(Card(
product_id=product_id,
))
new_tcgcards.append(CardTCGPlayer(
product_id=product_id,
group_id=group_id, # Use the correct group_id for this specific row
tcgplayer_id=row.tcgplayer_id,
product_line=row.product_line,
set_name=row.set_name,
product_name=row.product_name,
title=row.title,
number=row.number,
rarity=row.rarity,
condition=row.condition
))
# Batch create price objects
# row_prices = [
# Price(
# id=str(uuid()),
# product_id=product_id,
# marketplace_id=None,
# type=price_type,
# price=getattr(row, col_name)
# )
# for col_name, price_type in price_types.items()
# if getattr(row, col_name, None) is not None and getattr(row, col_name) > 0
# ]
# new_prices.extend(row_prices)
if new_products:
with db_transaction(self.db):
self.db.bulk_save_objects(new_products)
self.db.bulk_save_objects(new_cards)
self.db.bulk_save_objects(new_tcgcards)
# if new_prices:
# self.db.bulk_save_objects(new_prices)
return new_tcgcards
def load_export_csv_to_card_tcgplayer(self, export_csv: bytes, file_id: str = None, batch_size: int = 1000) -> None:
try:
if not export_csv:
raise ValueError("No export CSV provided")
df = self.df_util.csv_bytes_to_df(export_csv)
logger.debug(f"Loaded {len(df)} rows from export CSV")
# Get all group_ids upfront in a single query
set_to_group = dict(
self.db.query(TCGPlayerGroups.name, TCGPlayerGroups.group_id).all()
)
# Process in batches
for i in range(0, len(df), batch_size):
batch_df = df.iloc[i:i + batch_size]
batch_rows = [TCGPlayerPricingRow(row) for _, row in batch_df.iterrows()]
# Filter rows with valid group_ids
valid_rows = [
row for row in batch_rows
if row.set_name in set_to_group
]
# logger.debug(f"Processing batch {i // batch_size + 1}: {len(valid_rows)} valid rows")
if valid_rows:
# Pass the entire set_to_group mapping
self.create_tcgplayer_cards_batch(valid_rows, set_to_group)
except Exception as e:
logger.error(f"Failed to load export CSV: {e}")
# set file upload to failed
if file_id:
with db_transaction(self.db):
file = self.db.query(File).filter(File.id == file_id).first()
if file:
file.status = 'failed'
self.db.add(file)
raise
finally:
if file_id:
with db_transaction(self.db):
file = self.db.query(File).filter(File.id == file_id).first()
if file:
file.status = 'completed'
self.db.add(file)
def get_card_tcgplayer_from_manabox_row(self, card: ManaboxRow, group_id: int) -> CardTCGPlayer:
# Expanded rarity mapping
mb_to_tcg_rarity_mapping = {
"common": "C",
"uncommon": "U",
"rare": "R",
"mythic": "M",
"special": "S"
}
# Mapping from Manabox condition+foil to TCGPlayer condition
mb_to_tcg_condition_mapping = {
("near_mint", "foil"): "Near Mint Foil",
("near_mint", "normal"): "Near Mint",
("near_mint", "etched"): "Near Mint Foil"
}
# Get TCGPlayer condition from Manabox condition+foil combination
tcg_condition = mb_to_tcg_condition_mapping.get((card.condition, card.foil))
if tcg_condition is None:
logger.error(f"Unsupported condition/foil combination: {card.condition}, {card.foil}")
logger.error(f"Card details: name={card.name}, set_name={card.set_name}, collector_number={card.collector_number}")
return None
# Get TCGPlayer rarity from Manabox rarity
tcg_rarity = mb_to_tcg_rarity_mapping.get(card.rarity)
if tcg_rarity is None:
logger.error(f"Unsupported rarity: {card.rarity}")
logger.error(f"Card details: name={card.name}, set_name={card.set_name}, collector_number={card.collector_number}")
return None
# First query for matching products without rarity filter
# debug
# log everything in this query
# remove letters from card.collector_number FOR JOIN ONLY
join_collector_number = ''.join(filter(str.isdigit, card.collector_number))
# logger.debug(f"Querying for card: {card.name}, {card.set_code}, {card.collector_number}, {tcg_condition}, {group_id}")
base_query = self.db.query(CardTCGPlayer).filter(
CardTCGPlayer.number == join_collector_number,
CardTCGPlayer.condition == tcg_condition,
CardTCGPlayer.group_id == group_id,
CardTCGPlayer.rarity != "T" # TOKENS ARE NOT SUPPORTED CUZ BROKE LOL
)
# logger.debug(f"Base query: {base_query.statement.compile(compile_kwargs={'literal_binds': True})}")
# Get all potential matches
products = base_query.all()
# If no products found, return None
if not products:
logger.error(f"No matching TCGPlayer product found for card {card.name} ({card.set_code} {card.collector_number})")
return None
# Look for an exact match including rarity, unless the TCGPlayer product is a land
for product in products:
if product.rarity == "L" or product.rarity == tcg_rarity:
return product
# ignore rarity, just make sure only one product is returned
if len(products) > 1:
# try to match on name before failing
for product in products:
if product.product_name == card.name:
return product
elif len(products) == 1:
return products[0]
logger.error(f"Multiple matching TCGPlayer products found for card {card.name} ({card.set_code} {card.collector_number})")
return None
# If we got here, we found products but none matched our rarity criteria
# logger.error(f"No matching TCGPlayer product with correct rarity found for card {card.name} {card.rarity} {group_id} ({card.set_name} {card.collector_number})")
# return None
def get_pricing_export_for_all_products(self) -> File:
"""
"""
DEBUG = False
if DEBUG:
logger.debug("DEBUG: Using existing pricing export file")
file = self.db.query(File).filter(File.type == 'tcgplayer_pricing_export').first()
if file:
return file
try:
all_group_ids = self.db.query(TCGPlayerGroups.group_id).all()
all_group_ids = [str(group_id) for group_id, in all_group_ids]
export_csv = self._get_export_csv(all_group_ids)
export_csv_file = self.file_service.create_file(export_csv, CreateFileRequest(
source="tcgplayer",
type="tcgplayer_pricing_export",
filename="tcgplayer_pricing_export.csv"
))
return export_csv_file
except SQLAlchemyError as e:
raise RuntimeError(f"Failed to retrieve group IDs: {str(e)}")
def load_tcgplayer_cards(self) -> File:
try:
# Get pricing export
export_csv_file = self.get_pricing_export_for_all_products()
export_csv = self.file_service.get_file_content(export_csv_file.id)
# load to card tcgplayer
self.load_export_csv_to_card_tcgplayer(export_csv, export_csv_file.id)
return export_csv_file
except Exception as e:
logger.error(f"Failed to load prices: {e}")
raise
def open_box_cards_to_tcgplayer_inventory_df(self, open_box_ids: List[str]) -> pd.DataFrame:
tcgcards = (self.db.query(OpenBoxCard, CardTCGPlayer)
.filter(OpenBoxCard.open_box_id.in_(open_box_ids))
.join(CardTCGPlayer, OpenBoxCard.card_id == CardTCGPlayer.product_id)
.all())
if not tcgcards:
return None
# Create dataframe
df = pd.DataFrame([(tcg.product_id, tcg.tcgplayer_id, tcg.product_line, tcg.set_name, tcg.product_name,
tcg.title, tcg.number, tcg.rarity, tcg.condition, obc.quantity)
for obc, tcg in tcgcards],
columns=['product_id', 'tcgplayer_id', 'product_line', 'set_name', 'product_name',
'title', 'number', 'rarity', 'condition', 'quantity'])
# Add empty columns
df['Total Quantity'] = ''
df['Add to Quantity'] = df['quantity']
df['TCG Marketplace Price'] = ''
df['Photo URL'] = ''
# Rename columns
df = df.rename(columns={
'tcgplayer_id': 'TCGplayer Id',
'product_line': 'Product Line',
'set_name': 'Set Name',
'product_name': 'Product Name',
'title': 'Title',
'number': 'Number',
'rarity': 'Rarity',
'condition': 'Condition'
})
return df