This commit is contained in:
zman 2025-01-31 13:05:48 -05:00
parent 9f1a73f49d
commit 780b274faf
17 changed files with 1556 additions and 0 deletions

2
.gitignore vendored
View File

@ -168,3 +168,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# my stuff
*.db

0
__init__.py Normal file
View File

0
db/__init__.py Normal file
View File

75
db/database.py Normal file
View File

@ -0,0 +1,75 @@
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
from typing import Generator
import os
import logging
logger = logging.getLogger(__name__)
# Get database URL from environment variable with fallback
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///omegacard.db")
# Create engine with proper configuration
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True, # Enable connection health checks
pool_size=5, # Set reasonable pool size
max_overflow=10 # Allow some overflow connections
)
# Create session factory
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False
)
@contextmanager
def get_db_session() -> Generator[Session, None, None]:
"""Context manager for database sessions"""
session = SessionLocal()
try:
yield session
except Exception as e:
logger.error(f"Database session error: {str(e)}")
session.rollback()
raise
finally:
session.close()
def get_db() -> Generator[Session, None, None]:
"""Dependency for FastAPI to get database sessions"""
with get_db_session() as session:
yield session
def init_db() -> None:
"""Initialize database tables"""
from .models import Base
try:
Base.metadata.create_all(bind=engine)
logger.info("Database tables created successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {str(e)}")
raise
def check_db_connection() -> bool:
"""Check if database connection is working"""
try:
with get_db_session() as session:
session.execute(text("SELECT 1"))
return True
except Exception as e:
logger.error(f"Database connection check failed: {str(e)}")
return False
def destroy_db() -> None:
"""Destroy all database tables"""
from .models import Base
try:
Base.metadata.drop_all(bind=engine)
logger.info("Database tables dropped successfully")
except Exception as e:
logger.error(f"Failed to destroy database: {str(e)}")
raise

162
db/models.py Normal file
View File

@ -0,0 +1,162 @@
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class Box(Base):
__tablename__ = "boxes"
id = Column(String, primary_key=True, index=True)
upload_id = Column(String, ForeignKey("upload_history.upload_id"))
set_name = Column(String)
set_code = Column(String)
type = Column(String)
cost = Column(Float)
date_purchased = Column(DateTime)
date_opened = Column(DateTime)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class ManaboxExportData(Base):
__tablename__ = "manabox_export_data"
id = Column(String, primary_key=True)
upload_id = Column(String)
box_id = Column(String, nullable=True)
name = Column(String)
set_code = Column(String)
set_name = Column(String)
collector_number = Column(String)
foil = Column(String)
rarity = Column(String)
quantity = Column(Integer)
manabox_id = Column(String)
scryfall_id = Column(String)
purchase_price = Column(Float)
misprint = Column(String)
altered = Column(String)
condition = Column(String)
language = Column(String)
purchase_price_currency = Column(String)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class UploadHistory(Base):
__tablename__ = "upload_history"
id = Column(String, primary_key=True)
upload_id = Column(String)
filename = Column(String)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
status = Column(String)
class TCGPlayerGroups(Base):
__tablename__ = 'tcgplayer_groups'
id = Column(String, primary_key=True)
group_id = Column(Integer)
name = Column(String)
abbreviation = Column(String)
is_supplemental = Column(String)
published_on = Column(String)
modified_on = Column(String)
category_id = Column(Integer)
class TCGPlayerInventory(Base):
__tablename__ = 'tcgplayer_inventory'
# TCGplayer Id,Product Line,Set Name,Product Name,Title,Number,Rarity,Condition,TCG Market Price,TCG Direct Low,TCG Low Price With Shipping,TCG Low Price,Total Quantity,Add to Quantity,TCG Marketplace Price,Photo URL
id = Column(String, primary_key=True)
export_id = Column(String)
tcgplayer_product_id = Column(String, ForeignKey("tcgplayer_product.id"), nullable=True)
tcgplayer_id = Column(Integer)
product_line = Column(String)
set_name = Column(String)
product_name = Column(String)
title = Column(String)
number = Column(String)
rarity = Column(String)
condition = Column(String)
tcg_market_price = Column(Float)
tcg_direct_low = Column(Float)
tcg_low_price_with_shipping = Column(Float)
tcg_low_price = Column(Float)
total_quantity = Column(Integer)
add_to_quantity = Column(Integer)
tcg_marketplace_price = Column(Float)
photo_url = Column(String)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class TCGPlayerExportHistory(Base):
__tablename__ = 'tcgplayer_export_history'
id = Column(String, primary_key=True)
type = Column(String)
pricing_export_id = Column(String)
inventory_export_id = Column(String)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class TCGPlayerPricingHistory(Base):
__tablename__ = 'tcgplayer_pricing_history'
id = Column(String, primary_key=True)
tcgplayer_product_id = Column(String, ForeignKey("tcgplayer_product.id"))
export_id = Column(String)
group_id = Column(Integer)
tcgplayer_id = Column(Integer)
tcg_market_price = Column(Float)
tcg_direct_low = Column(Float)
tcg_low_price_with_shipping = Column(Float)
tcg_low_price = Column(Float)
tcg_marketplace_price = Column(Float)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class TCGPlayerProduct(Base):
__tablename__ = 'tcgplayer_product'
id = Column(String, primary_key=True)
group_id = Column(Integer)
tcgplayer_id = Column(Integer)
product_line = Column(String)
set_name = Column(String)
product_name = Column(String)
title = Column(String)
number = Column(String)
rarity = Column(String)
condition = Column(String)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class ManaboxTCGPlayerMapping(Base):
__tablename__ = 'manabox_tcgplayer_mapping'
id = Column(String, primary_key=True)
manabox_id = Column(String, ForeignKey("manabox_export_data.id"))
tcgplayer_id = Column(Integer, ForeignKey("tcgplayer_inventory.tcgplayer_id"))
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class SetCodeGroupIdMapping(Base):
__tablename__ = 'set_code_group_id_mapping'
id = Column(String, primary_key=True)
set_code = Column(String)
group_id = Column(Integer)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class UnmatchedManaboxData(Base):
__tablename__ = 'unmatched_manabox_data'
id = Column(String, primary_key=True)
manabox_id = Column(String, ForeignKey("manabox_export_data.id"))
reason = Column(String)
date_created = Column(DateTime, default=datetime.now)
date_modified = Column(DateTime, default=datetime.now, onupdate=datetime.now)

23
db/utils.py Normal file
View File

@ -0,0 +1,23 @@
from contextlib import contextmanager
from sqlalchemy.orm import Session
from exceptions import FailedUploadException
import logging
logger = logging.getLogger(__name__)
@contextmanager
def db_transaction(db: Session):
"""Simple context manager for database transactions"""
try:
yield
db.commit()
except FailedUploadException as failed_upload:
logger.error(f"Failed upload: {str(failed_upload.message)}")
db.rollback()
db.add(failed_upload.file_upload_record)
db.commit()
raise
except Exception as e:
logger.error(f"Database error: {str(e)}")
db.rollback()
raise

45
dependencies.py Normal file
View File

@ -0,0 +1,45 @@
from sqlalchemy.orm import Session
from services.data import DataService
from services.upload import UploadService
from services.box import BoxService
from services.tcgplayer import TCGPlayerService
from services.pricing import PricingService
from fastapi import Depends
from db.database import get_db
## Upload
def get_upload_service(db: Session = Depends(get_db)) -> UploadService:
"""Dependency injection for UploadService"""
return UploadService(db)
## box
def get_box_service(db: Session = Depends(get_db)) -> BoxService:
"""Dependency injection for BoxService"""
return BoxService(db)
## Pricing
def get_pricing_service(db: Session = Depends(get_db)) -> PricingService:
"""Dependency injection for PricingService"""
return PricingService(db)
## tcgplayer
def get_tcgplayer_service(
db: Session = Depends(get_db),
pricing_service: PricingService = Depends(get_pricing_service)
) -> TCGPlayerService:
"""Dependency injection for TCGPlayerService"""
return TCGPlayerService(db, pricing_service)
## Data
def get_data_service(
db: Session = Depends(get_db),
tcgplayer_service: TCGPlayerService = Depends(get_tcgplayer_service)
) -> DataService:
"""Dependency injection for DataService"""
return DataService(db, tcgplayer_service)

4
exceptions.py Normal file
View File

@ -0,0 +1,4 @@
class FailedUploadException(Exception):
def __init__(self, file_upload_record, message="Failed to update Manabox data"):
self.file_upload_record = file_upload_record
self.message = message

75
main.py Normal file
View File

@ -0,0 +1,75 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from routes.routes import router
from db.database import init_db, check_db_connection, destroy_db, get_db
from db.utils import db_transaction
import logging
import sys
from services.tcgplayer import TCGPlayerService, PricingService
from db.models import TCGPlayerGroups
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('app.log') # Added this line
]
)
logger = logging.getLogger(__name__)
# Create FastAPI instance
app = FastAPI(
title="Card Management API",
description="API for managing card collections and TCGPlayer integration",
version="1.0.0",
debug=True
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Modify this in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(router)
# Optional: Add startup and shutdown events
@app.on_event("startup")
async def startup_event():
# Check database connection
if not check_db_connection():
raise Exception("Database connection failed")
# destroy db
#destroy_db()
# Initialize database
init_db()
# get db session
db = next(get_db())
# populate tcgplayer groups
if db.query(TCGPlayerGroups).count() == 0:
with db_transaction(db):
tcgplayer_service = TCGPlayerService(db, PricingService(db))
tcgplayer_service.populate_tcgplayer_groups()
@app.on_event("shutdown")
async def shutdown_event():
# Clean up any connections or resources
pass
# Root endpoint
@app.get("/")
async def root():
return {"message": "Card Management API"}
# Run the application
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

0
routes/__init__.py Normal file
View File

167
routes/routes.py Normal file
View File

@ -0,0 +1,167 @@
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Request, BackgroundTasks
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from typing import Dict, Any, List
from db.database import get_db
from services.upload import UploadService
from services.box import BoxService
from services.tcgplayer import TCGPlayerService
from services.data import DataService
from dependencies import get_data_service, get_upload_service, get_tcgplayer_service, get_box_service
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["cards"])
## health check
@router.get("/health", response_model=dict)
async def health_check() -> dict:
"""
Health check endpoint
"""
logger.info("Health check")
return {"status": "ok"}
## test endpoint - logs all detail about request
@router.post("/test", response_model=dict)
async def test_endpoint(request: Request, file:UploadFile = File(...)) -> dict:
"""
Test endpoint
"""
content = await file.read()
# log filename
logger.info(f"file received: {file.filename}")
# print first 100 characters of file content
logger.info(f"file content: {content[:100]}")
return {"status": "ok"}
@router.post("/upload/manabox", response_model=dict)
async def upload_manabox(
background_tasks: BackgroundTasks,
upload_service: UploadService = Depends(get_upload_service),
data_service: DataService = Depends(get_data_service),
file: UploadFile = File(...)
) -> dict:
"""
Upload endpoint for Manabox CSV files
"""
try:
logger.info(f"file received: {file.filename}")
# Read the file content
content = await file.read()
filename = file.filename
if not content:
logger.error("Empty file content")
raise HTTPException(status_code=400, detail="Empty file content")
# You might want to validate it's a CSV file
if not file.filename.endswith('.csv'):
logger.error("File must be a CSV")
raise HTTPException(status_code=400, detail="File must be a CSV")
result = upload_service.process_manabox_upload(content, filename)
background_tasks.add_task(data_service.bg_set_manabox_tcg_relationship, upload_id=result[1])
return result[0]
except Exception as e:
logger.error(f"Manabox upload failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/createBox", response_model=dict)
async def create_box(
upload_id: str,
box_service: BoxService = Depends(get_box_service)
) -> dict:
try:
result = box_service.convert_upload_to_boxes(upload_id)
except Exception as e:
logger.error(f"Box creation failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
return result
@router.post("/deleteBox", response_model=dict)
async def delete_box(
box_id: str,
box_service: BoxService = Depends(get_box_service)
) -> dict:
try:
result = box_service.delete_box(box_id)
except Exception as e:
logger.error(f"Box deletion failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
return result
@router.post("/tcgplayer/add/box/{box_id}", response_model=dict)
async def add_box(box_id: str = None, tcgplayer_service: TCGPlayerService = Depends(get_tcgplayer_service)):
try:
csv_content = tcgplayer_service.add_to_tcgplayer(box_id)
return StreamingResponse(
iter([csv_content]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=add_to_tcgplayer.csv"}
)
except Exception as e:
logger.error(f"Box add failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/tcgplayer/update/box/{box_id}", response_model=dict)
async def update_box(box_id: int = None):
"""asdf"""
pass
@router.post("/tcgplayer/updateInventory", response_model=dict)
async def update_inventory(
background_tasks: BackgroundTasks,
tcgplayer_service: TCGPlayerService = Depends(get_tcgplayer_service),
data_service: DataService = Depends(get_data_service)):
try:
result = tcgplayer_service.update_inventory('live')
export_id = result['export_id']
background_tasks.add_task(data_service.bg_set_tcg_inventory_product_relationship, export_id)
return result
except Exception as e:
logger.error(f"Inventory update failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/tcgplayer/updatePricing", response_model=dict)
async def update_inventory(
tcgplayer_service: TCGPlayerService = Depends(get_tcgplayer_service),
group_ids: Dict = None):
try:
result = tcgplayer_service.update_pricing(group_ids)
return result
except Exception as e:
logger.error(f"Pricing update failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/tcgplayer/updatePricingAll", response_model=dict)
async def update_inventory(tcgplayer_service: TCGPlayerService = Depends(get_tcgplayer_service)):
try:
result = tcgplayer_service.update_pricing_all()
return result
except Exception as e:
logger.error(f"Pricing update failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@router.get("/tcgplayer/createLiveInventoryPricingUpdateFile")
async def create_inventory_import(
tcgplayer_service: TCGPlayerService = Depends(get_tcgplayer_service)
):
try:
csv_content = tcgplayer_service.get_live_inventory_pricing_update_csv()
return StreamingResponse(
iter([csv_content]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=inventory_pricing_update.csv"}
)
except Exception as e:
logger.error(f"Inventory import creation failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))

0
services/__init__.py Normal file
View File

100
services/box.py Normal file
View File

@ -0,0 +1,100 @@
from db.models import ManaboxExportData, Box, UploadHistory
from db.utils import db_transaction
import uuid
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy.engine.result import Row
import logging
logger = logging.getLogger(__name__)
class BoxObject:
def __init__(
self, upload_id: str, set_name: str,
set_code: str, cost: float = None, date_purchased: datetime = None,
date_opened: datetime = None, box_id: str = None):
self.upload_id = upload_id
self.box_id = box_id if box_id else str(uuid.uuid4())
self.set_name = set_name
self.set_code = set_code
self.cost = cost
self.date_purchased = date_purchased
self.date_opened = date_opened
class BoxService:
def __init__(self, db: Session):
self.db = db
def _validate_upload_id(self, upload_id: str):
# check if upload_history status = 'success'
if self.db.query(UploadHistory).filter(UploadHistory.upload_id == upload_id).first() is None:
raise Exception(f"Upload ID {upload_id} not found")
if self.db.query(UploadHistory).filter(UploadHistory.upload_id == upload_id).first().status != 'success':
raise Exception(f"Upload ID {upload_id} not successful")
# check if at least 1 row in manabox_export_data with upload_id
if self.db.query(ManaboxExportData).filter(ManaboxExportData.upload_id == upload_id).first() is None:
raise Exception(f"Upload ID {upload_id} has no data")
def _get_set_info(self, upload_id: str) -> list[Row[tuple[str, str]]]:
# get distinct set_name, set_code from manabox_export_data for upload_id
boxes = self.db.query(ManaboxExportData.set_name, ManaboxExportData.set_code).filter(ManaboxExportData.upload_id == upload_id).distinct().all()
if not boxes or len(boxes) == 0:
raise Exception(f"Upload ID {upload_id} has no data")
return boxes
def _update_manabox_export_data_box_id(self, box: Box):
# based on upload_id, set_name, set_code, update box_id in manabox_export_data for all rows where box id is null
with db_transaction(self.db):
self.db.query(ManaboxExportData).filter(
ManaboxExportData.upload_id == box.upload_id).filter(
ManaboxExportData.set_name == box.set_name).filter(
ManaboxExportData.set_code == box.set_code).filter(
ManaboxExportData.box_id == None).update({ManaboxExportData.box_id: box.id})
def convert_upload_to_boxes(self, upload_id: str):
self._validate_upload_id(upload_id)
# get distinct set_name, set_code from manabox_export_data for upload_id
box_set_info = self._get_set_info(upload_id)
created_boxes = []
# create boxes
for box in box_set_info:
box_obj = BoxObject(upload_id, set_name = box.set_name, set_code = box.set_code)
new_box = self.create_box(box_obj)
logger.info(f"Created box {new_box.id} for upload {upload_id}")
self._update_manabox_export_data_box_id(new_box)
created_boxes.append(new_box)
return {"status": "success", "boxes": f"{[box.id for box in created_boxes]}"}
def create_box(self, box: BoxObject):
with db_transaction(self.db):
box_record = Box(
id = box.box_id,
upload_id = box.upload_id,
set_name = box.set_name,
set_code = box.set_code,
cost = box.cost,
date_purchased = box.date_purchased,
date_opened = box.date_opened
)
self.db.add(box_record)
return box_record
def get_box(self):
pass
def delete_box(self, box_id: str):
# delete box
with db_transaction(self.db):
self.db.query(Box).filter(Box.id == box_id).delete()
# update manabox_export_data box_id to null
with db_transaction(self.db):
self.db.query(ManaboxExportData).filter(ManaboxExportData.box_id == box_id).update({ManaboxExportData.box_id: None})
return {"status": "success", "box_id": box_id}
def update_box(self):
pass

149
services/data.py Normal file
View File

@ -0,0 +1,149 @@
from sqlalchemy.orm import Session
import logging
from fastapi import BackgroundTasks
from db.models import TCGPlayerGroups, SetCodeGroupIdMapping, ManaboxExportData, TCGPlayerProduct, ManaboxTCGPlayerMapping, UnmatchedManaboxData, TCGPlayerInventory
from db.utils import db_transaction
import uuid
from services.tcgplayer import TCGPlayerService
from sqlalchemy.sql import exists
logger = logging.getLogger(__name__)
class DataService:
def __init__(self, db: Session, tcgplayer_service: TCGPlayerService):
self.db = db
self.tcgplayer_service = tcgplayer_service
def _normalize_rarity(self, rarity: str) -> str:
if rarity.lower() == "rare":
return "R"
elif rarity.lower() == "mythic":
return "M"
elif rarity.lower() == "uncommon":
return "U"
elif rarity.lower() == "common":
return "C"
elif rarity.lower() in ["R", "M", "U", "C"]:
return rarity.upper()
else:
raise ValueError(f"Invalid rarity: {rarity}")
def _normalize_condition(self, condition: str, foil: str) -> str:
if condition.lower() == "near_mint":
condition1 = "Near Mint"
else:
raise ValueError(f"Invalid condition: {condition}")
if foil.lower() == "foil":
condition2 = " Foil"
elif foil.lower() == "normal":
condition2 = ""
else:
raise ValueError(f"Invalid foil: {foil}")
return condition1 + condition2
def _normalize_number(self, number: str) -> str:
return str(number.split(".")[0])
def _convert_set_code_to_group_id(self, set_code: str) -> str:
group = self.db.query(TCGPlayerGroups).filter(TCGPlayerGroups.abbreviation == set_code).first()
return group.group_id
def _add_set_group_mapping(self, set_code: str, group_id: str) -> None:
with db_transaction(self.db):
self.db.add(SetCodeGroupIdMapping(id=str(uuid.uuid4()), set_code=set_code, group_id=group_id))
def _get_set_codes(self, **filters) -> list:
query = self.db.query(ManaboxExportData.set_code).distinct()
for field, value in filters.items():
if value is not None:
query = query.filter(getattr(ManaboxExportData, field) == value)
return [code[0] for code in query.all()]
async def bg_set_manabox_tcg_relationship(self, box_id: str = None, upload_id: str = None) -> None:
if not bool(box_id) ^ bool(upload_id):
raise ValueError("Must provide exactly one of box_id or upload_id")
filters = {"box_id": box_id} if box_id else {"upload_id": upload_id}
set_codes = self._get_set_codes(**filters)
for set_code in set_codes:
try:
group_id = self._convert_set_code_to_group_id(set_code)
except AttributeError:
logger.warning(f"No group found for set code {set_code}")
continue
self._add_set_group_mapping(set_code, group_id)
# update pricing for groups
if self.db.query(TCGPlayerProduct).filter(TCGPlayerProduct.group_id == group_id).count() == 0:
self.tcgplayer_service.update_pricing(set_name_ids={"set_name_ids":[group_id]})
# match manabox data to tcgplayer pricing data
# match on manabox - set_code (through group_id), collector_number, foil, rarity, condition
# match on tcgplayer - group_id, number, rarity, condition (condition + foil)
# use normalizing functions
matched_records = self.db.query(ManaboxExportData).filter(ManaboxExportData.set_code.in_(set_codes)).all()
for record in matched_records:
rarity = self._normalize_rarity(record.rarity)
condition = self._normalize_condition(record.condition, record.foil)
number = self._normalize_number(record.collector_number)
group_id = self._convert_set_code_to_group_id(record.set_code)
tcg_record = self.db.query(TCGPlayerProduct).filter(
TCGPlayerProduct.group_id == group_id,
TCGPlayerProduct.number == number,
TCGPlayerProduct.rarity == rarity,
TCGPlayerProduct.condition == condition
).all()
if len(tcg_record) == 0:
logger.warning(f"No match found for {record.name}")
if self.db.query(UnmatchedManaboxData).filter(UnmatchedManaboxData.manabox_id == record.id).count() == 0:
with db_transaction(self.db):
self.db.add(UnmatchedManaboxData(id=str(uuid.uuid4()), manabox_id=record.id, reason="No match found"))
elif len(tcg_record) > 1:
logger.warning(f"Multiple matches found for {record.name}")
if self.db.query(UnmatchedManaboxData).filter(UnmatchedManaboxData.manabox_id == record.id).count() == 0:
with db_transaction(self.db):
self.db.add(UnmatchedManaboxData(id=str(uuid.uuid4()), manabox_id=record.id, reason="Multiple matches found"))
else:
with db_transaction(self.db):
self.db.add(ManaboxTCGPlayerMapping(id=str(uuid.uuid4()), manabox_id=record.id, tcgplayer_id=tcg_record[0].id))
async def bg_set_tcg_inventory_product_relationship(self, export_id: str) -> None:
inventory_without_product = (
self.db.query(TCGPlayerInventory.tcgplayer_id, TCGPlayerInventory.set_name)
.filter(TCGPlayerInventory.total_quantity > 0)
.filter(TCGPlayerInventory.product_line == "Magic")
.filter(TCGPlayerInventory.export_id == export_id)
.filter(TCGPlayerInventory.tcgplayer_product_id.is_(None))
.filter(~exists().where(
TCGPlayerProduct.id == TCGPlayerInventory.tcgplayer_product_id
))
.all()
)
set_names = list(set(inv.set_name for inv in inventory_without_product
if inv.set_name is not None and isinstance(inv.set_name, str)))
group_ids = self.db.query(TCGPlayerGroups.group_id).filter(
TCGPlayerGroups.name.in_(set_names)
).all()
group_ids = [str(group_id[0]) for group_id in group_ids]
self.tcgplayer_service.update_pricing(set_name_ids={"set_name_ids": group_ids})
for inventory in inventory_without_product:
product = self.db.query(TCGPlayerProduct).filter(
TCGPlayerProduct.tcgplayer_id == inventory.tcgplayer_id
).first()
if product:
with db_transaction(self.db):
inventory_record = self.db.query(TCGPlayerInventory).filter(
TCGPlayerInventory.tcgplayer_id == inventory.tcgplayer_id,
TCGPlayerInventory.export_id == export_id
).first()
if inventory_record:
inventory_record.tcgplayer_product_id = product.id
self.db.add(inventory_record)

205
services/pricing.py Normal file
View File

@ -0,0 +1,205 @@
import logging
from typing import Callable
from db.models import TCGPlayerInventory, TCGPlayerExportHistory, TCGPlayerPricingHistory, ManaboxExportData, ManaboxTCGPlayerMapping, TCGPlayerProduct
from sqlalchemy.orm import Session
import pandas as pd
from db.utils import db_transaction
from sqlalchemy import func, and_, exists
logger = logging.getLogger(__name__)
class PricingService:
def __init__(self, db: Session):
self.db = db
def get_box_with_most_recent_prices(self, box_id: str) -> pd.DataFrame:
latest_prices = (
self.db.query(
TCGPlayerPricingHistory.tcgplayer_product_id,
func.max(TCGPlayerPricingHistory.date_created).label('max_date')
)
.group_by(TCGPlayerPricingHistory.tcgplayer_product_id)
.subquery('latest') # Added name to subquery
)
result = (
self.db.query(ManaboxExportData, TCGPlayerPricingHistory, TCGPlayerProduct)
.join(ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id)
.join(TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id)
.join(TCGPlayerPricingHistory, TCGPlayerProduct.id == TCGPlayerPricingHistory.tcgplayer_product_id)
.join(
latest_prices,
and_(
TCGPlayerPricingHistory.tcgplayer_product_id == latest_prices.c.tcgplayer_product_id,
TCGPlayerPricingHistory.date_created == latest_prices.c.max_date
)
)
.filter(ManaboxExportData.box_id == box_id) # Removed str() conversion
.all()
)
logger.debug(f"Found {len(result)} rows")
df = pd.DataFrame([{
**{f"manabox_{k}": v for k, v in row[0].__dict__.items() if not k.startswith('_')},
**{f"pricing_{k}": v for k, v in row[1].__dict__.items() if not k.startswith('_')},
**{f"tcgproduct_{k}": v for k, v in row[2].__dict__.items() if not k.startswith('_')}
} for row in result])
return df
def get_live_inventory_with_most_recent_prices(self) -> pd.DataFrame:
# Get latest export IDs using subqueries
latest_inventory_export = (
self.db.query(TCGPlayerExportHistory.inventory_export_id)
.filter(TCGPlayerExportHistory.type == "live_inventory")
.order_by(TCGPlayerExportHistory.date_created.desc())
.limit(1)
.scalar_subquery()
)
# this is bad because latest pricing export is not guaranteed to be related to the latest inventory export
latest_pricing_export = (
self.db.query(TCGPlayerExportHistory.pricing_export_id)
.filter(TCGPlayerExportHistory.type == "pricing")
.order_by(TCGPlayerExportHistory.date_created.desc())
.limit(1)
.scalar_subquery()
)
# Join inventory and pricing data in a single query
inventory_with_pricing = (
self.db.query(TCGPlayerInventory, TCGPlayerPricingHistory)
.join(
TCGPlayerPricingHistory,
TCGPlayerInventory.tcgplayer_product_id == TCGPlayerPricingHistory.tcgplayer_product_id
)
.filter(
TCGPlayerInventory.export_id == latest_inventory_export,
TCGPlayerPricingHistory.export_id == latest_pricing_export
)
.all()
)
# Convert to pandas DataFrame
df = pd.DataFrame([{
# Inventory columns
**{f"inventory_{k}": v
for k, v in row[0].__dict__.items()
if not k.startswith('_')},
# Pricing columns
**{f"pricing_{k}": v
for k, v in row[1].__dict__.items()
if not k.startswith('_')}
} for row in inventory_with_pricing])
return df
def default_pricing_algo(self, df: pd.DataFrame = None):
if df is None:
logger.debug("No DataFrame provided, fetching live inventory with most recent prices")
df = self.get_live_inventory_with_most_recent_prices()
# if tcg low price is < 0.35, set my_price to 0.35
# if either tcg low price or tcg low price with shipping is under 5, set my_price to tcg low price * 1.25
# if tcg low price with shipping is > 25 set price to tcg low price with shipping * 1.025
# otherwise, set price to tcg low price with shipping * 1.10
# also round to 2 decimal places
df['my_price'] = df.apply(lambda row: round(
0.35 if row['pricing_tcg_low_price'] < 0.35 else
row['pricing_tcg_low_price'] * 1.25 if row['pricing_tcg_low_price'] < 5 or row['pricing_tcg_low_price_with_shipping'] < 5 else
row['pricing_tcg_low_price_with_shipping'] * 1.025 if row['pricing_tcg_low_price_with_shipping'] > 25 else
row['pricing_tcg_low_price_with_shipping'] * 1.10, 2), axis=1)
# log rows with no price
no_price = df[df['my_price'].isnull()]
if len(no_price) > 0:
logger.warning(f"Found {len(no_price)} rows with no price")
logger.warning(no_price)
# remove rows with no price
df = df.dropna(subset=['my_price'])
return df
def convert_df_to_csv(self, df: pd.DataFrame):
# Flip the mapping to be from current names TO desired names
column_mapping = {
'inventory_tcgplayer_id': 'TCGplayer Id',
'inventory_product_line': 'Product Line',
'inventory_set_name': 'Set Name',
'inventory_product_name': 'Product Name',
'inventory_title': 'Title',
'inventory_number': 'Number',
'inventory_rarity': 'Rarity',
'inventory_condition': 'Condition',
'pricing_tcg_market_price': 'TCG Market Price',
'pricing_tcg_direct_low': 'TCG Direct Low',
'pricing_tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
'pricing_tcg_low_price': 'TCG Low Price',
'inventory_total_quantity': 'Total Quantity',
'inventory_add_to_quantity': 'Add to Quantity',
'my_price': 'TCG Marketplace Price',
'inventory_photo_url': 'Photo URL'
}
df['pricing_tcg_market_price'] = ""
df['pricing_tcg_direct_low'] = ""
df['pricing_tcg_low_price_with_shipping'] = ""
df['pricing_tcg_low_price'] = ""
df['inventory_total_quantity'] = ""
df['inventory_add_to_quantity'] = 0
df['inventory_photo_url'] = ""
# First select the columns we want (using the keys of our mapping)
# Then rename them to the desired names (the values in our mapping)
df = df[column_mapping.keys()].rename(columns=column_mapping)
return df.to_csv(index=False, quoting=1, quotechar='"')
def convert_add_df_to_csv(self, df: pd.DataFrame):
column_mapping = {
'tcgproduct_tcgplayer_id': 'TCGplayer Id',
'tcgproduct_product_line': 'Product Line',
'tcgproduct_set_name': 'Set Name',
'tcgproduct_product_name': 'Product Name',
'tcgproduct_title': 'Title',
'tcgproduct_number': 'Number',
'tcgproduct_rarity': 'Rarity',
'tcgproduct_condition': 'Condition',
'pricing_tcg_market_price': 'TCG Market Price',
'pricing_tcg_direct_low': 'TCG Direct Low',
'pricing_tcg_low_price_with_shipping': 'TCG Low Price With Shipping',
'pricing_tcg_low_price': 'TCG Low Price',
'tcgproduct_group_id': 'Total Quantity',
'manabox_quantity': 'Add to Quantity',
'my_price': 'TCG Marketplace Price',
'tcgproduct_photo_url': 'Photo URL'
}
df['tcgproduct_group_id'] = ""
df['pricing_tcg_market_price'] = ""
df['pricing_tcg_direct_low'] = ""
df['pricing_tcg_low_price_with_shipping'] = ""
df['pricing_tcg_low_price'] = ""
df['tcgproduct_photo_url'] = ""
df = df[column_mapping.keys()].rename(columns=column_mapping)
return df.to_csv(index=False, quoting=1, quotechar='"')
def create_live_inventory_pricing_update_csv(self, algo: Callable = None) -> str:
actual_algo = algo if algo is not None else self.default_pricing_algo
df = actual_algo()
csv = self.convert_df_to_csv(df)
return csv
def create_add_to_tcgplayer_csv(self, box_id: str = None, upload_id: str = None, algo: Callable = None) -> str:
actual_algo = algo if algo is not None else self.default_pricing_algo
if box_id and upload_id:
raise ValueError("Cannot specify both box_id and upload_id")
elif not box_id and not upload_id:
raise ValueError("Must specify either box_id or upload_id")
elif box_id:
logger.debug("creating df")
df = self.get_box_with_most_recent_prices(box_id)
elif upload_id:
raise NotImplementedError("Not yet implemented")
df = actual_algo(df)
csv = self.convert_add_df_to_csv(df)
return csv

452
services/tcgplayer.py Normal file
View File

@ -0,0 +1,452 @@
from db.models import ManaboxExportData, Box, TCGPlayerGroups, TCGPlayerInventory, TCGPlayerExportHistory, TCGPlayerPricingHistory, TCGPlayerProduct, ManaboxTCGPlayerMapping
import requests
from sqlalchemy.orm import Session
from db.utils import db_transaction
import uuid
import browser_cookie3
import webbrowser
from typing import Optional, Dict ,List
from enum import Enum
import logging
from dataclasses import dataclass
import urllib.parse
import json
from datetime import datetime
import time
import csv
from typing import List, Dict, Optional
from io import StringIO, BytesIO
from services.pricing import PricingService
from sqlalchemy.sql import exists
logger = logging.getLogger(__name__)
class Browser(Enum):
"""Supported browser types for cookie extraction"""
BRAVE = "brave"
CHROME = "chrome"
FIREFOX = "firefox"
@dataclass
class TCGPlayerConfig:
"""Configuration for TCGPlayer API interactions"""
tcgplayer_base_url: str = "https://store.tcgplayer.com"
tcgplayer_login_path: str = "/oauth/login"
staged_inventory_download_path: str = "/Admin/Pricing/DownloadStagedInventoryExportCSV?type=Pricing"
live_inventory_download_path = "/Admin/Pricing/DownloadMyExportCSV?type=Pricing"
pricing_export_path: str = "/admin/pricing/downloadexportcsv"
max_retries: int = 1
class TCGPlayerService:
def __init__(self, db: Session,
pricing_service: PricingService,
config: TCGPlayerConfig=TCGPlayerConfig(),
browser_type: Browser=Browser.BRAVE):
self.db = db
self.config = config
self.browser_type = browser_type
self.cookies = None
self.previous_request_time = None
self.pricing_service = pricing_service
def _insert_groups(self, groups):
for group in groups:
db_group = TCGPlayerGroups(
id=str(uuid.uuid4()),
group_id=group['groupId'],
name=group['name'],
abbreviation=group['abbreviation'],
is_supplemental=group['isSupplemental'],
published_on=group['publishedOn'],
modified_on=group['modifiedOn'],
category_id=group['categoryId']
)
self.db.add(db_group)
def populate_tcgplayer_groups(self):
group_endpoint = "https://tcgcsv.com/tcgplayer/1/groups"
response = requests.get(group_endpoint)
response.raise_for_status()
groups = response.json()['results']
# manually add broken groups
groups.append({
"groupId": 2422,
"name": "Modern Horizons 2 Timeshifts",
"abbreviation": "H2R",
"isSupplemental": "false",
"publishedOn": "2018-11-08T00:00:00",
"modifiedOn": "2018-11-08T00:00:00",
"categoryId": 1
})
# Insert groups into db
with db_transaction(self.db):
self._insert_groups(groups)
def _get_browser_cookies(self) -> Optional[Dict]:
"""Retrieve cookies from the specified browser"""
try:
cookie_getter = getattr(browser_cookie3, self.browser_type.value, None)
if not cookie_getter:
raise ValueError(f"Unsupported browser type: {self.browser_type.value}")
return cookie_getter()
except Exception as e:
logger.error(f"Failed to get browser cookies: {str(e)}")
return None
def _send_request(self, url: str, method: str, data=None, except_302=False) -> requests.Response:
"""Send a request with the specified cookies"""
# if previous request was made less than 10 seconds ago, wait until current time is 10 seconds after previous request
if self.previous_request_time:
time_diff = (datetime.now() - self.previous_request_time).total_seconds()
if time_diff < 10:
logger.info(f"Waiting 10 seconds before next request...")
time.sleep(10 - time_diff)
headers = self._set_headers(method)
if not self.cookies:
self.cookies = self._get_browser_cookies()
if not self.cookies:
raise ValueError("Failed to retrieve browser cookies")
try:
#logger.info(f"debug: request url {url}, method {method}, data {data}")
response = requests.request(method, url, headers=headers, cookies=self.cookies, data=data)
response.raise_for_status()
if response.status_code == 302 and not except_302:
logger.warning("Redirecting to login page...")
self._refresh_authentication()
return self._send_request(url, method, except_302=True)
elif response.status_code == 302 and except_302:
raise ValueError("Redirected to login page after authentication refresh")
self.previous_request_time = datetime.now()
return response
except requests.RequestException as e:
logger.error(f"Request failed: {str(e)}")
return None
def _set_headers(self, method: str) -> Dict:
base_headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'accept-language': 'en-US,en;q=0.8',
'priority': 'u=0, i',
'referer': 'https://store.tcgplayer.com/admin/pricing',
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'sec-gpc': '1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36'
}
if method == 'POST':
post_headers = {
'cache-control': 'max-age=0',
'content-type': 'application/x-www-form-urlencoded',
'origin': 'https://store.tcgplayer.com'
}
base_headers.update(post_headers)
return base_headers
def _set_pricing_export_payload(self, set_name_ids: List[str]) -> Dict:
data = {
"PricingType": "Pricing",
"CategoryId": "1",
"SetNameIds": set_name_ids,
"ConditionIds": ["1"],
"RarityIds": ["0"],
"LanguageIds": ["1"],
"PrintingIds": ["0"],
"CompareAgainstPrice": False,
"PriceToCompare": 3,
"ValueToCompare": 1,
"PriceValueToCompare": None,
"MyInventory": False,
"ExcludeListos": False,
"ExportLowestListingNotMe": False
}
payload = "model=" + urllib.parse.quote(json.dumps(data))
return payload
def _refresh_authentication(self) -> None:
"""Open browser for user to refresh authentication"""
login_url = f"{self.config.tcgplayer_base_url}{self.config.tcgplayer_login_path}"
logger.info("Opening browser for authentication refresh...")
webbrowser.open(login_url)
input('Please login and press Enter to continue...')
# Clear existing cookies to force refresh
self.cookies = None
def _get_inventory(self, version) -> bytes:
if version == 'staged':
inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.staged_inventory_download_path}"
elif version == 'live':
inventory_download_url = f"{self.config.tcgplayer_base_url}{self.config.live_inventory_download_path}"
else:
raise ValueError("Invalid inventory version")
response = self._send_request(inventory_download_url, 'GET')
if response:
return self._process_content(response.content)
return None
def _process_content(self, content: bytes) -> List[Dict]:
if not content:
return []
try:
text_content = content.decode('utf-8')
except UnicodeDecodeError:
for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
text_content = content.decode(encoding)
break
except UnicodeDecodeError:
continue
else:
raise
csv_file = StringIO(text_content)
try:
reader = csv.DictReader(csv_file)
inventory = [
{k: v.strip() if v else None for k, v in row.items()}
for row in reader
if any(v.strip() for v in row.values())
]
return inventory
finally:
csv_file.close()
def update_inventory(self, version: str) -> Dict:
if version not in ['staged', 'live']:
raise ValueError("Invalid inventory version")
export_id = str(uuid.uuid4())
inventory = self._get_inventory(version)
if not inventory:
return {"message": "No inventory to update"}
# add snapshot id
for item in inventory:
item['export_id'] = export_id
# check if product exists for tcgplayer_id
product_exists = self.db.query(TCGPlayerProduct).filter_by(tcgplayer_id=item['TCGplayer Id']).first()
if product_exists:
item['tcgplayer_product_id'] = product_exists.id
else:
item['tcgplayer_product_id'] = None
inventory_fields = {
'TCGplayer Id': 'tcgplayer_id',
'tcgplayer_product_id': 'tcgplayer_product_id',
'export_id': 'export_id',
'Product Line': 'product_line',
'Set Name': 'set_name',
'Product Name': 'product_name',
'Title': 'title',
'Number': 'number',
'Rarity': 'rarity',
'Condition': 'condition',
'TCG Market Price': 'tcg_market_price',
'TCG Direct Low': 'tcg_direct_low',
'TCG Low Price With Shipping': 'tcg_low_price_with_shipping',
'TCG Low Price': 'tcg_low_price',
'Total Quantity': 'total_quantity',
'Add to Quantity': 'add_to_quantity',
'TCG Marketplace Price': 'tcg_marketplace_price'
}
with db_transaction(self.db):
export_history = TCGPlayerExportHistory(
id=str(uuid.uuid4()),
type=version + '_inventory',
inventory_export_id=export_id
)
self.db.add(export_history)
for item in inventory:
db_item = TCGPlayerInventory(
id=str(uuid.uuid4()),
**{db_field: item.get(csv_field)
for csv_field, db_field in inventory_fields.items()}
)
self.db.add(db_item)
return {"message": "Inventory updated successfully", "export_id": export_id}
def _get_export_csv(self, set_name_ids: List[str]) -> bytes:
"""
Download export CSV and save to specified path
Returns True if successful, False otherwise
"""
payload = self._set_pricing_export_payload(set_name_ids)
export_csv_download_url = f"{self.config.tcgplayer_base_url}{self.config.pricing_export_path}"
response = self._send_request(export_csv_download_url, method='POST', data=payload)
csv = self._process_content(response.content)
return csv
def _update_tcgplayer_products(self):
pass
def update_pricing(self, set_name_ids: Dict[str, List[str]]) -> Dict:
export_id = str(uuid.uuid4())
product_fields = {
'TCGplayer Id': 'tcgplayer_id',
'group_id': 'group_id',
'Product Line': 'product_line',
'Set Name': 'set_name',
'Product Name': 'product_name',
'Title': 'title',
'Number': 'number',
'Rarity': 'rarity',
'Condition': 'condition'
}
pricing_fields = {
'TCGplayer Id': 'tcgplayer_id',
'tcgplayer_product_id': 'tcgplayer_product_id',
'export_id': 'export_id',
'group_id': 'group_id',
'TCG Market Price': 'tcg_market_price',
'TCG Direct Low': 'tcg_direct_low',
'TCG Low Price With Shipping': 'tcg_low_price_with_shipping',
'TCG Low Price': 'tcg_low_price',
'TCG Marketplace Price': 'tcg_marketplace_price'
}
for set_name_id in set_name_ids['set_name_ids']:
export_csv = self._get_export_csv([set_name_id])
for item in export_csv:
item['export_id'] = export_id
item['group_id'] = set_name_id
# check if product already exists
product_exists = self.db.query(TCGPlayerProduct).filter_by(tcgplayer_id=item['TCGplayer Id']).first()
if product_exists:
item['tcgplayer_product_id'] = product_exists.id
else:
with db_transaction(self.db):
product = TCGPlayerProduct(
id=str(uuid.uuid4()),
**{db_field: item.get(csv_field)
for csv_field, db_field in product_fields.items()}
)
self.db.add(product)
item['tcgplayer_product_id'] = product.id
with db_transaction(self.db):
ph_item = TCGPlayerPricingHistory(
id=str(uuid.uuid4()),
**{db_field: item.get(csv_field)
for csv_field, db_field in pricing_fields.items()}
)
self.db.add(ph_item)
with db_transaction(self.db):
export_history = TCGPlayerExportHistory(
id=str(uuid.uuid4()),
type='pricing',
pricing_export_id=export_id
)
self.db.add(export_history)
return {"message": "Pricing updated successfully"}
def update_pricing_all(self) -> Dict:
set_name_ids = self.db.query(TCGPlayerGroups.group_id).all()
set_name_ids = [str(group_id) for group_id, in set_name_ids]
return self.update_pricing({'set_name_ids': set_name_ids})
def update_pricing_for_existing_product_groups(self) -> Dict:
set_name_ids = self.db.query(TCGPlayerProduct.group_id).distinct().all()
set_name_ids = [str(group_id) for group_id, in set_name_ids]
return self.update_pricing({'set_name_ids': set_name_ids})
def tcg_set_tcg_inventory_product_relationship(self, export_id: str) -> None:
inventory_without_product = (
self.db.query(TCGPlayerInventory.tcgplayer_id, TCGPlayerInventory.set_name)
.filter(TCGPlayerInventory.total_quantity > 0)
.filter(TCGPlayerInventory.product_line == "Magic")
.filter(TCGPlayerInventory.export_id == export_id)
.filter(TCGPlayerInventory.tcgplayer_product_id.is_(None))
.filter(~exists().where(
TCGPlayerProduct.id == TCGPlayerInventory.tcgplayer_product_id
))
.all()
)
set_names = list(set(inv.set_name for inv in inventory_without_product
if inv.set_name is not None and isinstance(inv.set_name, str)))
group_ids = self.db.query(TCGPlayerGroups.group_id).filter(
TCGPlayerGroups.name.in_(set_names)
).all()
group_ids = [str(group_id[0]) for group_id in group_ids]
self.update_pricing(set_name_ids={"set_name_ids": group_ids})
for inventory in inventory_without_product:
product = self.db.query(TCGPlayerProduct).filter(
TCGPlayerProduct.tcgplayer_id == inventory.tcgplayer_id
).first()
if product:
with db_transaction(self.db):
inventory_record = self.db.query(TCGPlayerInventory).filter(
TCGPlayerInventory.tcgplayer_id == inventory.tcgplayer_id,
TCGPlayerInventory.export_id == export_id
).first()
if inventory_record:
inventory_record.tcgplayer_product_id = product.id
self.db.add(inventory_record)
def get_live_inventory_pricing_update_csv(self):
export_id = self.update_inventory("live")['export_id']
self.tcg_set_tcg_inventory_product_relationship(export_id)
self.update_pricing_for_existing_product_groups()
update_csv = self.pricing_service.create_live_inventory_pricing_update_csv()
return update_csv
def get_group_ids_for_box(self, box_id: str) -> List[str]:
# use manabox_export_data.box_id and tcgplayer_product.group_id to filter
# use manabox_tcgplayer_mapping.manabox_id and manabox_tcgplayer_mapping.tcgplayer_id to join
group_ids = self.db.query(ManaboxExportData.box_id, TCGPlayerProduct.group_id).join(
ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id
).join(
TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id
).filter(ManaboxExportData.box_id == box_id).all()
group_ids = list(set(str(group_id) for box_id, group_id in group_ids))
return group_ids
def get_group_ids_for_upload(self, upload_id: str) -> List[str]:
group_ids = self.db.query(ManaboxExportData.upload_id, TCGPlayerProduct.group_id).join(
ManaboxTCGPlayerMapping, ManaboxExportData.id == ManaboxTCGPlayerMapping.manabox_id
).join(
TCGPlayerProduct, ManaboxTCGPlayerMapping.tcgplayer_id == TCGPlayerProduct.id
).filter(ManaboxExportData.upload_id == upload_id).all()
group_ids = list(set(str(group_id) for upload_id, group_id in group_ids))
return group_ids
def add_to_tcgplayer(self, box_id: str = None, upload_id: str = None) :
if box_id and upload_id:
raise ValueError("Cannot provide both box_id and upload_id")
elif box_id:
group_ids = self.get_group_ids_for_box(box_id)
elif upload_id:
group_ids = self.get_group_ids_for_upload(upload_id)
else:
raise ValueError("Must provide either box_id or upload_id")
self.update_pricing({'set_name_ids': group_ids})
add_csv = self.pricing_service.create_add_to_tcgplayer_csv(box_id)
return add_csv

97
services/upload.py Normal file
View File

@ -0,0 +1,97 @@
from db.models import ManaboxExportData, UploadHistory
import pandas as pd
from io import StringIO
import uuid
from sqlalchemy.orm import Session
from db.utils import db_transaction
from exceptions import FailedUploadException
import logging
logger = logging.getLogger(__name__)
class UploadObject:
def __init__(self,
content: bytes = None,
upload_id: str = None,
filename: str = None,
df: pd.DataFrame = None):
self.content = content
self.upload_id = upload_id
self.filename = filename
self.df = df
class UploadService:
def __init__(self, db: Session):
self.db = db
def _content_to_df(self, content: bytes) -> pd.DataFrame:
df = pd.read_csv(StringIO(content.decode('utf-8')))
df.columns = df.columns.str.lower().str.replace(' ', '_')
return df
def _create_upload_id(self) -> str:
return str(uuid.uuid4())
def _prepare_manabox_df(self, content: bytes, upload_id: str) -> pd.DataFrame:
df = self._content_to_df(content)
df['upload_id'] = upload_id
df['box_id'] = None
return df
def _create_file_upload_record(self, upload_id: str, filename: str) -> UploadHistory:
file_upload_record = UploadHistory(
id = str(uuid.uuid4()),
upload_id = upload_id,
filename = filename,
status = "pending"
)
self.db.add(file_upload_record)
return file_upload_record
def _update_manabox_data(self, df: pd.DataFrame) -> bool:
for index, row in df.iterrows():
try:
add_row = ManaboxExportData(
id = str(uuid.uuid4()),
upload_id = row['upload_id'],
box_id = row['box_id'],
name = row['name'],
set_code = row['set_code'],
set_name = row['set_name'],
collector_number = row['collector_number'],
foil = row['foil'],
rarity = row['rarity'],
quantity = row['quantity'],
manabox_id = row['manabox_id'],
scryfall_id = row['scryfall_id'],
purchase_price = row['purchase_price'],
misprint = row['misprint'],
altered = row['altered'],
condition = row['condition'],
language = row['language'],
purchase_price_currency = row['purchase_price_currency']
)
self.db.add(add_row)
except Exception as e:
logger.error(f"Error adding row to ManaboxExportData")
return False
return True
def process_manabox_upload(self, content: bytes, filename: str):
upload = UploadObject(content=content, filename=filename)
upload.upload_id = self._create_upload_id()
upload.df = self._prepare_manabox_df(upload.content, upload.upload_id)
with db_transaction(self.db):
file_upload_record = self._create_file_upload_record(upload.upload_id, upload.filename)
if not self._update_manabox_data(upload.df):
# set upload to failed
file_upload_record.status = "failed"
raise FailedUploadException(file_upload_record)
else:
# set upload_history status to success
file_upload_record.status = "success"
return {"message": f"Manabox upload successful. Upload ID: {upload.upload_id}"}, upload.upload_id