giga_tcg/app/services/tcgplayer_api.py
2025-02-21 11:46:50 -05:00

105 lines
4.4 KiB
Python

from dataclasses import dataclass
import logging
from app.db.models import Orders, OrderProducts, CardTCGPlayer
from app.services.util._requests import RequestsUtil
from app.services.util._docker import DockerUtil
from app.db.utils import db_transaction
from sqlalchemy.orm import Session
from uuid import uuid4 as uuid
logger = logging.getLogger(__name__)
@dataclass
class TCGPlayerAPIConfig:
"""Configuration for TCGPlayer API"""
ORDER_BASE_URL: str = "https://order-management-api.tcgplayer.com/orders"
API_VERSION: str = "?api-version=2.0"
class TCGPlayerAPIService:
def __init__(self, db: Session):
self.db = db
self.docker_util = DockerUtil()
self.requests_util = RequestsUtil()
self.is_in_docker = self.docker_util.is_in_docker()
self.config = TCGPlayerAPIConfig()
self.cookies = self.get_cookies()
def get_cookies(self) -> dict:
if self.is_in_docker:
return self.requests_util.get_tcgplayer_cookies_from_file()
else:
return self.requests_util.get_tcgplayer_browser_cookies()
def get_order(self, order_id: str) -> dict:
url = f"{self.config.ORDER_BASE_URL}/{order_id}{self.config.API_VERSION}"
response = self.requests_util.send_request(url, method='GET', cookies=self.cookies)
if response:
return response.json()
return None
def get_product_ids_from_sku(self, sku_ids: list[str]) -> dict:
"""Get product IDs from TCGPlayer SKU IDs"""
tcg_cards = self.db.query(CardTCGPlayer).filter(CardTCGPlayer.tcgplayer_id.in_(sku_ids)).all()
return {card.tcgplayer_id: card.product_id for card in tcg_cards}
def save_order(self, order: dict):
# check if order exists by order number
order_number = order['orderNumber']
existing_order = self.db.query(Orders).filter(Orders.order_id == order_number).first()
if existing_order:
logger.info(f"Order {order_number} already exists in database")
return existing_order
transaction = order['transaction']
shipping = order['shippingAddress']
products = order['products']
with db_transaction(self.db):
db_order = Orders(
id = str(uuid()),
order_id=order_number,
buyer_name=order['buyerName'],
recipient_name=shipping['recipientName'],
recipient_address_one=shipping['addressOne'],
recipient_address_two=shipping['addressTwo'] if 'addressTwo' in shipping else '',
recipient_city=shipping['city'],
recipient_state=shipping['territory'],
recipient_zip=shipping['postalCode'],
recipient_country=shipping['country'],
order_date=order['createdAt'],
status=order['status'],
num_products=len(products),
num_cards=sum([product['quantity'] for product in products]),
product_amount=transaction['productAmount'],
shipping_amount=transaction['shippingAmount'],
gross_amount=transaction['grossAmount'],
fee_amount=transaction['feeAmount'],
net_amount=transaction['netAmount'],
direct_fee_amount=transaction['directFeeAmount']
)
self.db.add(db_order)
product_ids = [product['skuId'] for product in products]
sku_to_product_id_mapping = self.get_product_ids_from_sku(product_ids)
order_products = []
for product in products:
product_id = sku_to_product_id_mapping.get(product['skuId'])
if product_id:
order_products.append(
OrderProducts(
id=str(uuid()),
order_id=db_order.id,
product_id=product_id,
quantity=product['quantity'],
unit_price=product['unitPrice']
)
)
self.db.add_all(order_products)
return db_order
def process_orders(self, orders: list[str]):
processed_orders = []
for order_id in orders:
order = self.get_order(order_id)
if order:
self.save_order(order)
processed_orders.append(order_id)
return processed_orders