452 lines
19 KiB
Python
452 lines
19 KiB
Python
from db.models import ManaboxExportData, Box, TCGPlayerGroups, TCGPlayerInventory, TCGPlayerExportHistory, TCGPlayerPricingHistory, TCGPlayerProduct, ManaboxTCGPlayerMapping
|
|
import requests
|
|
from sqlalchemy.orm import Session
|
|
from db.utils import db_transaction
|
|
import uuid
|
|
import browser_cookie3
|
|
import webbrowser
|
|
from typing import Optional, Dict ,List
|
|
from enum import Enum
|
|
import logging
|
|
from dataclasses import dataclass
|
|
import urllib.parse
|
|
import json
|
|
from datetime import datetime
|
|
import time
|
|
import csv
|
|
from typing import List, Dict, Optional
|
|
from io import StringIO, BytesIO
|
|
from services.pricing import PricingService
|
|
from sqlalchemy.sql import exists
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Browser(Enum):
|
|
"""Supported browser types for cookie extraction"""
|
|
BRAVE = "brave"
|
|
CHROME = "chrome"
|
|
FIREFOX = "firefox"
|
|
|
|
@dataclass
|
|
class TCGPlayerConfig:
|
|
"""Configuration for TCGPlayer API interactions"""
|
|
tcgplayer_base_url: str = "https://store.tcgplayer.com"
|
|
tcgplayer_login_path: str = "/oauth/login"
|
|
staged_inventory_download_path: str = "/Admin/Pricing/DownloadStagedInventoryExportCSV?type=Pricing"
|
|
live_inventory_download_path = "/Admin/Pricing/DownloadMyExportCSV?type=Pricing"
|
|
pricing_export_path: str = "/admin/pricing/downloadexportcsv"
|
|
max_retries: int = 1
|
|
|
|
class TCGPlayerService:
|
|
def __init__(self, db: Session,
|
|
pricing_service: PricingService,
|
|
config: TCGPlayerConfig=TCGPlayerConfig(),
|
|
browser_type: Browser=Browser.BRAVE):
|
|
self.db = db
|
|
self.config = config
|
|
self.browser_type = browser_type
|
|
self.cookies = None
|
|
self.previous_request_time = None
|
|
self.pricing_service = pricing_service
|
|
|
|
def _insert_groups(self, groups):
|
|
for group in groups:
|
|
db_group = TCGPlayerGroups(
|
|
id=str(uuid.uuid4()),
|
|
group_id=group['groupId'],
|
|
name=group['name'],
|
|
abbreviation=group['abbreviation'],
|
|
is_supplemental=group['isSupplemental'],
|
|
published_on=group['publishedOn'],
|
|
modified_on=group['modifiedOn'],
|
|
category_id=group['categoryId']
|
|
)
|
|
self.db.add(db_group)
|
|
|
|
def populate_tcgplayer_groups(self):
|
|
group_endpoint = "https://tcgcsv.com/tcgplayer/1/groups"
|
|
response = requests.get(group_endpoint)
|
|
response.raise_for_status()
|
|
groups = response.json()['results']
|
|
# manually add broken groups
|
|
groups.append({
|
|
"groupId": 2422,
|
|
"name": "Modern Horizons 2 Timeshifts",
|
|
"abbreviation": "H2R",
|
|
"isSupplemental": "false",
|
|
"publishedOn": "2018-11-08T00:00:00",
|
|
"modifiedOn": "2018-11-08T00:00:00",
|
|
"categoryId": 1
|
|
})
|
|
# Insert groups into db
|
|
with db_transaction(self.db):
|
|
self._insert_groups(groups)
|
|
|
|
def _get_browser_cookies(self) -> Optional[Dict]:
|
|
"""Retrieve cookies from the specified browser"""
|
|
try:
|
|
cookie_getter = getattr(browser_cookie3, self.browser_type.value, None)
|
|
if not cookie_getter:
|
|
raise ValueError(f"Unsupported browser type: {self.browser_type.value}")
|
|
return cookie_getter()
|
|
except Exception as e:
|
|
logger.error(f"Failed to get browser cookies: {str(e)}")
|
|
return None
|
|
|
|
def _send_request(self, url: str, method: str, data=None, except_302=False) -> requests.Response:
|
|
"""Send a request with the specified cookies"""
|
|
# if previous request was made less than 10 seconds ago, wait until current time is 10 seconds after previous request
|
|
if self.previous_request_time:
|
|
time_diff = (datetime.now() - self.previous_request_time).total_seconds()
|
|
if time_diff < 10:
|
|
logger.info(f"Waiting 10 seconds before next request...")
|
|
time.sleep(10 - time_diff)
|
|
headers = self._set_headers(method)
|
|
|
|
if not self.cookies:
|
|
self.cookies = self._get_browser_cookies()
|
|
if not self.cookies:
|
|
raise ValueError("Failed to retrieve browser cookies")
|
|
|
|
try:
|
|
#logger.info(f"debug: request url {url}, method {method}, data {data}")
|
|
response = requests.request(method, url, headers=headers, cookies=self.cookies, data=data)
|
|
response.raise_for_status()
|
|
|
|
if response.status_code == 302 and not except_302:
|
|
logger.warning("Redirecting to login page...")
|
|
self._refresh_authentication()
|
|
return self._send_request(url, method, except_302=True)
|
|
|
|
elif response.status_code == 302 and except_302:
|
|
raise ValueError("Redirected to login page after authentication refresh")
|
|
|
|
self.previous_request_time = datetime.now()
|
|
|
|
return response
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Request failed: {str(e)}")
|
|
return None
|
|
|
|
def _set_headers(self, method: str) -> Dict:
|
|
base_headers = {
|
|
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
|
'accept-language': 'en-US,en;q=0.8',
|
|
'priority': 'u=0, i',
|
|
'referer': 'https://store.tcgplayer.com/admin/pricing',
|
|
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
|
|
'sec-ch-ua-mobile': '?0',
|
|
'sec-ch-ua-platform': '"macOS"',
|
|
'sec-fetch-dest': 'document',
|
|
'sec-fetch-mode': 'navigate',
|
|
'sec-fetch-site': 'same-origin',
|
|
'sec-fetch-user': '?1',
|
|
'sec-gpc': '1',
|
|
'upgrade-insecure-requests': '1',
|
|
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36'
|
|
}
|
|
|
|
if method == 'POST':
|
|
post_headers = {
|
|
'cache-control': 'max-age=0',
|
|
'content-type': 'application/x-www-form-urlencoded',
|
|
'origin': 'https://store.tcgplayer.com'
|
|
}
|
|
base_headers.update(post_headers)
|
|
|
|
return base_headers
|
|
|
|
def _set_pricing_export_payload(self, set_name_ids: List[str]) -> Dict:
|
|
data = {
|
|
"PricingType": "Pricing",
|
|
"CategoryId": "1",
|
|
"SetNameIds": set_name_ids,
|
|
"ConditionIds": ["1"],
|
|
"RarityIds": ["0"],
|
|
"LanguageIds": ["1"],
|
|
"PrintingIds": ["0"],
|
|
"CompareAgainstPrice": False,
|
|
"PriceToCompare": 3,
|
|
"ValueToCompare": 1,
|
|
"PriceValueToCompare": None,
|
|
"MyInventory": False,
|
|
"ExcludeListos": False,
|
|
"ExportLowestListingNotMe": False
|
|
}
|
|
payload = "model=" + urllib.parse.quote(json.dumps(data))
|
|
return payload
|
|
|
|
def _refresh_authentication(self) -> None:
|
|
"""Open browser for user to refresh authentication"""
|
|
login_url = f"{self.config.tcgplayer_base_url}{self.config.tcgplayer_login_path}"
|
|
logger.info("Opening browser for authentication refresh...")
|
|
webbrowser.open(login_url)
|
|
input('Please login and press Enter to continue...')
|
|
# Clear existing cookies to force refresh
|
|
self.cookies = None
|
|
|
|
def _get_inventory(self, version) -> bytes:
|
|
if version == 'staged':
|
|
inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.staged_inventory_download_path}"
|
|
elif version == 'live':
|
|
inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.live_inventory_download_path}"
|
|
else:
|
|
raise ValueError("Invalid inventory version")
|
|
response = self._send_request(inventory_download_url, 'GET')
|
|
if response:
|
|
return self._process_content(response.content)
|
|
return None
|
|
|
|
def _process_content(self, content: bytes) -> List[Dict]:
|
|
if not content:
|
|
return []
|
|
|
|
try:
|
|
text_content = content.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
|
|
try:
|
|
text_content = content.decode(encoding)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
csv_file = StringIO(text_content)
|
|
try:
|
|
reader = csv.DictReader(csv_file)
|
|
inventory = [
|
|
{k: v.strip() if v else None for k, v in row.items()}
|
|
for row in reader
|
|
if any(v.strip() for v in row.values())
|
|
]
|
|
return inventory
|
|
finally:
|
|
csv_file.close()
|
|
|
|
def update_inventory(self, version: str) -> Dict:
|
|
if version not in ['staged', 'live']:
|
|
raise ValueError("Invalid inventory version")
|
|
export_id = str(uuid.uuid4())
|
|
inventory = self._get_inventory(version)
|
|
if not inventory:
|
|
return {"message": "No inventory to update"}
|
|
|
|
# add snapshot id
|
|
for item in inventory:
|
|
item['export_id'] = export_id
|
|
# check if product exists for tcgplayer_id
|
|
product_exists = self.db.query(TCGPlayerProduct).filter_by(tcgplayer_id=item['TCGplayer Id']).first()
|
|
if product_exists:
|
|
item['tcgplayer_product_id'] = product_exists.id
|
|
else:
|
|
item['tcgplayer_product_id'] = None
|
|
|
|
inventory_fields = {
|
|
'TCGplayer Id': 'tcgplayer_id',
|
|
'tcgplayer_product_id': 'tcgplayer_product_id',
|
|
'export_id': 'export_id',
|
|
'Product Line': 'product_line',
|
|
'Set Name': 'set_name',
|
|
'Product Name': 'product_name',
|
|
'Title': 'title',
|
|
'Number': 'number',
|
|
'Rarity': 'rarity',
|
|
'Condition': 'condition',
|
|
'TCG Market Price': 'tcg_market_price',
|
|
'TCG Direct Low': 'tcg_direct_low',
|
|
'TCG Low Price With Shipping': 'tcg_low_price_with_shipping',
|
|
'TCG Low Price': 'tcg_low_price',
|
|
'Total Quantity': 'total_quantity',
|
|
'Add to Quantity': 'add_to_quantity',
|
|
'TCG Marketplace Price': 'tcg_marketplace_price'
|
|
}
|
|
|
|
with db_transaction(self.db):
|
|
export_history = TCGPlayerExportHistory(
|
|
id=str(uuid.uuid4()),
|
|
type=version + '_inventory',
|
|
inventory_export_id=export_id
|
|
)
|
|
self.db.add(export_history)
|
|
for item in inventory:
|
|
db_item = TCGPlayerInventory(
|
|
id=str(uuid.uuid4()),
|
|
**{db_field: item.get(csv_field)
|
|
for csv_field, db_field in inventory_fields.items()}
|
|
)
|
|
self.db.add(db_item)
|
|
|
|
return {"message": "Inventory updated successfully", "export_id": export_id}
|
|
|
|
def _get_export_csv(self, set_name_ids: List[str]) -> bytes:
|
|
"""
|
|
Download export CSV and save to specified path
|
|
Returns True if successful, False otherwise
|
|
"""
|
|
payload = self._set_pricing_export_payload(set_name_ids)
|
|
export_csv_download_url = f"{self.config.tcgplayer_base_url}{self.config.pricing_export_path}"
|
|
response = self._send_request(export_csv_download_url, method='POST', data=payload)
|
|
csv = self._process_content(response.content)
|
|
return csv
|
|
|
|
def _update_tcgplayer_products(self):
|
|
pass
|
|
|
|
def update_pricing(self, set_name_ids: Dict[str, List[str]]) -> Dict:
|
|
export_id = str(uuid.uuid4())
|
|
product_fields = {
|
|
'TCGplayer Id': 'tcgplayer_id',
|
|
'group_id': 'group_id',
|
|
'Product Line': 'product_line',
|
|
'Set Name': 'set_name',
|
|
'Product Name': 'product_name',
|
|
'Title': 'title',
|
|
'Number': 'number',
|
|
'Rarity': 'rarity',
|
|
'Condition': 'condition'
|
|
}
|
|
pricing_fields = {
|
|
'TCGplayer Id': 'tcgplayer_id',
|
|
'tcgplayer_product_id': 'tcgplayer_product_id',
|
|
'export_id': 'export_id',
|
|
'group_id': 'group_id',
|
|
'TCG Market Price': 'tcg_market_price',
|
|
'TCG Direct Low': 'tcg_direct_low',
|
|
'TCG Low Price With Shipping': 'tcg_low_price_with_shipping',
|
|
'TCG Low Price': 'tcg_low_price',
|
|
'TCG Marketplace Price': 'tcg_marketplace_price'
|
|
}
|
|
|
|
for set_name_id in set_name_ids['set_name_ids']:
|
|
export_csv = self._get_export_csv([set_name_id])
|
|
for item in export_csv:
|
|
item['export_id'] = export_id
|
|
item['group_id'] = set_name_id
|
|
# check if product already exists
|
|
product_exists = self.db.query(TCGPlayerProduct).filter_by(tcgplayer_id=item['TCGplayer Id']).first()
|
|
if product_exists:
|
|
item['tcgplayer_product_id'] = product_exists.id
|
|
else:
|
|
with db_transaction(self.db):
|
|
product = TCGPlayerProduct(
|
|
id=str(uuid.uuid4()),
|
|
**{db_field: item.get(csv_field)
|
|
for csv_field, db_field in product_fields.items()}
|
|
)
|
|
self.db.add(product)
|
|
item['tcgplayer_product_id'] = product.id
|
|
|
|
with db_transaction(self.db):
|
|
ph_item = TCGPlayerPricingHistory(
|
|
id=str(uuid.uuid4()),
|
|
**{db_field: item.get(csv_field)
|
|
for csv_field, db_field in pricing_fields.items()}
|
|
)
|
|
self.db.add(ph_item)
|
|
|
|
|
|
with db_transaction(self.db):
|
|
export_history = TCGPlayerExportHistory(
|
|
id=str(uuid.uuid4()),
|
|
type='pricing',
|
|
pricing_export_id=export_id
|
|
)
|
|
self.db.add(export_history)
|
|
|
|
return {"message": "Pricing updated successfully"}
|
|
|
|
def update_pricing_all(self) -> Dict:
|
|
set_name_ids = self.db.query(TCGPlayerGroups.group_id).all()
|
|
set_name_ids = [str(group_id) for group_id, in set_name_ids]
|
|
return self.update_pricing({'set_name_ids': set_name_ids})
|
|
|
|
def update_pricing_for_existing_product_groups(self) -> Dict:
|
|
set_name_ids = self.db.query(TCGPlayerProduct.group_id).distinct().all()
|
|
set_name_ids = [str(group_id) for group_id, in set_name_ids]
|
|
return self.update_pricing({'set_name_ids': set_name_ids})
|
|
|
|
def tcg_set_tcg_inventory_product_relationship(self, export_id: str) -> None:
|
|
inventory_without_product = (
|
|
self.db.query(TCGPlayerInventory.tcgplayer_id, TCGPlayerInventory.set_name)
|
|
.filter(TCGPlayerInventory.total_quantity > 0)
|
|
.filter(TCGPlayerInventory.product_line == "Magic")
|
|
.filter(TCGPlayerInventory.export_id == export_id)
|
|
.filter(TCGPlayerInventory.tcgplayer_product_id.is_(None))
|
|
.filter(~exists().where(
|
|
TCGPlayerProduct.id == TCGPlayerInventory.tcgplayer_product_id
|
|
))
|
|
.all()
|
|
)
|
|
|
|
set_names = list(set(inv.set_name for inv in inventory_without_product
|
|
if inv.set_name is not None and isinstance(inv.set_name, str)))
|
|
|
|
group_ids = self.db.query(TCGPlayerGroups.group_id).filter(
|
|
TCGPlayerGroups.name.in_(set_names)
|
|
).all()
|
|
|
|
group_ids = [str(group_id[0]) for group_id in group_ids]
|
|
|
|
self.update_pricing(set_name_ids={"set_name_ids": group_ids})
|
|
|
|
for inventory in inventory_without_product:
|
|
product = self.db.query(TCGPlayerProduct).filter(
|
|
TCGPlayerProduct.tcgplayer_id == inventory.tcgplayer_id
|
|
).first()
|
|
|
|
if product:
|
|
with db_transaction(self.db):
|
|
inventory_record = self.db.query(TCGPlayerInventory).filter(
|
|
TCGPlayerInventory.tcgplayer_id == inventory.tcgplayer_id,
|
|
TCGPlayerInventory.export_id == export_id
|
|
).first()
|
|
|
|
if inventory_record:
|
|
inventory_record.tcgplayer_product_id = product.id
|
|
self.db.add(inventory_record)
|
|
|
|
|
|
def get_live_inventory_pricing_update_csv(self):
|
|
export_id = self.update_inventory("live")['export_id']
|
|
self.tcg_set_tcg_inventory_product_relationship(export_id)
|
|
self.update_pricing_for_existing_product_groups()
|
|
update_csv = self.pricing_service.create_live_inventory_pricing_update_csv()
|
|
return update_csv
|
|
|
|
def get_group_ids_for_box(self, box_id: str) -> List[str]:
|
|
# use manabox_export_data.box_id and tcgplayer_product.group_id to filter
|
|
# use manabox_tcgplayer_mapping.manabox_id and manabox_tcgplayer_mapping.tcgplayer_id to join
|
|
group_ids = self.db.query(ManaboxExportData.box_id, TCGPlayerProduct.group_id).join(
|
|
ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id
|
|
).join(
|
|
TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id
|
|
).filter(ManaboxExportData.box_id == box_id).all()
|
|
group_ids = list(set(str(group_id) for box_id, group_id in group_ids))
|
|
return group_ids
|
|
|
|
def get_group_ids_for_upload(self, upload_id: str) -> List[str]:
|
|
group_ids = self.db.query(ManaboxExportData.upload_id, TCGPlayerProduct.group_id).join(
|
|
ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id
|
|
).join(
|
|
TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id
|
|
).filter(ManaboxExportData.upload_id == upload_id).all()
|
|
group_ids = list(set(str(group_id) for upload_id, group_id in group_ids))
|
|
return group_ids
|
|
|
|
|
|
def add_to_tcgplayer(self, box_id: str = None, upload_id: str = None) :
|
|
if box_id and upload_id:
|
|
raise ValueError("Cannot provide both box_id and upload_id")
|
|
elif box_id:
|
|
group_ids = self.get_group_ids_for_box(box_id)
|
|
elif upload_id:
|
|
group_ids = self.get_group_ids_for_upload(upload_id)
|
|
else:
|
|
raise ValueError("Must provide either box_id or upload_id")
|
|
self.update_pricing({'set_name_ids': group_ids})
|
|
add_csv = self.pricing_service.create_add_to_tcgplayer_csv(box_id)
|
|
return add_csv |