811 lines
42 KiB
Python
811 lines
42 KiB
Python
import os
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, List, Dict, Any, Union
|
|
from sqlalchemy.orm import Session
|
|
from app.models.tcgplayer_products import TCGPlayerProduct, TCGPlayerCategory, TCGPlayerGroup
|
|
from app.models.inventory_management import SealedExpectedValue
|
|
from app.services.base_service import BaseService
|
|
from app.schemas.file import FileInDB
|
|
from app.db.database import transaction as db_transaction
|
|
from app.schemas.transaction import PurchaseTransactionCreate, PurchaseItem
|
|
from app.contexts.inventory_item import InventoryItemContextFactory
|
|
from app.models.tcgplayer_products import MTGJSONSKU, MTGJSONCard
|
|
from app.models.tcgplayer_products import TCGPlayerPriceHistory
|
|
from app.models.critical_error_log import CriticalErrorLog
|
|
import csv
|
|
import io
|
|
import logging
|
|
import shutil
|
|
import py7zr
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DataInitializationService(BaseService):
|
|
def __init__(self):
|
|
super().__init__(None)
|
|
|
|
async def _cache_data(
|
|
self,
|
|
db: Session,
|
|
data: Union[dict, list],
|
|
filename: str,
|
|
subdir: str,
|
|
default_str: bool = False,
|
|
file_type: str = "json",
|
|
content_type: str = "application/json",
|
|
metadata: Optional[Dict] = None
|
|
) -> FileInDB:
|
|
"""Generic function to cache data to a JSON file"""
|
|
file_data = json.dumps(data, default=str if default_str else None, indent=2)
|
|
return await self.file_service.save_file(
|
|
db,
|
|
file_data,
|
|
filename,
|
|
subdir,
|
|
file_type=file_type,
|
|
content_type=content_type,
|
|
metadata=metadata
|
|
)
|
|
|
|
async def _load_cached_data(
|
|
self,
|
|
db: Session,
|
|
filename: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Generic function to load cached data from a JSON file with 7-day expiration"""
|
|
file_record = await self.file_service.get_file_by_filename(db, filename)
|
|
if file_record:
|
|
# Check if cache is expired (7 days)
|
|
# Ensure both datetimes are timezone-aware
|
|
cache_age = datetime.now(timezone.utc) - file_record.created_at
|
|
if cache_age.days < 7:
|
|
with open(file_record.path, 'r') as f:
|
|
return json.load(f)
|
|
else:
|
|
logger.info(f"Cache expired for {filename}, age: {cache_age.days} days")
|
|
# Delete the expired cache file
|
|
await self.file_service.delete_file(db, file_record.id)
|
|
return None
|
|
|
|
async def sync_categories(self, db: Session, categories_data: dict):
|
|
"""Sync categories data to the database using streaming for large datasets"""
|
|
categories = categories_data.get("results", [])
|
|
batch_size = 1000 # Process in batches of 1000
|
|
total_categories = len(categories)
|
|
|
|
with db_transaction(db):
|
|
for i in range(0, total_categories, batch_size):
|
|
batch = categories[i:i + batch_size]
|
|
for category_data in batch:
|
|
existing_category = db.query(TCGPlayerCategory).filter(TCGPlayerCategory.category_id == category_data["categoryId"]).first()
|
|
if existing_category:
|
|
# Update existing category
|
|
for key, value in {
|
|
"name": category_data["name"],
|
|
"display_name": category_data.get("displayName"),
|
|
"seo_category_name": category_data.get("seoCategoryName"),
|
|
"category_description": category_data.get("categoryDescription"),
|
|
"category_page_title": category_data.get("categoryPageTitle"),
|
|
"sealed_label": category_data.get("sealedLabel"),
|
|
"non_sealed_label": category_data.get("nonSealedLabel"),
|
|
"condition_guide_url": category_data.get("conditionGuideUrl"),
|
|
"is_scannable": category_data.get("isScannable", False),
|
|
"popularity": category_data.get("popularity", 0),
|
|
"is_direct": category_data.get("isDirect", False),
|
|
"modified_on": datetime.fromisoformat(category_data["modifiedOn"].replace("Z", "+00:00")) if category_data.get("modifiedOn") else None
|
|
}.items():
|
|
setattr(existing_category, key, value)
|
|
else:
|
|
new_category = TCGPlayerCategory(
|
|
category_id=category_data["categoryId"],
|
|
name=category_data["name"],
|
|
display_name=category_data.get("displayName"),
|
|
seo_category_name=category_data.get("seoCategoryName"),
|
|
category_description=category_data.get("categoryDescription"),
|
|
category_page_title=category_data.get("categoryPageTitle"),
|
|
sealed_label=category_data.get("sealedLabel"),
|
|
non_sealed_label=category_data.get("nonSealedLabel"),
|
|
condition_guide_url=category_data.get("conditionGuideUrl"),
|
|
is_scannable=category_data.get("isScannable", False),
|
|
popularity=category_data.get("popularity", 0),
|
|
is_direct=category_data.get("isDirect", False),
|
|
modified_on=datetime.fromisoformat(category_data["modifiedOn"].replace("Z", "+00:00")) if category_data.get("modifiedOn") else None
|
|
)
|
|
db.add(new_category)
|
|
|
|
# Commit after each batch
|
|
db.commit()
|
|
logger.info(f"Processed {min(i + batch_size, total_categories)}/{total_categories} categories")
|
|
|
|
async def init_categories(self, db: Session, use_cache: bool = True) -> bool:
|
|
"""Initialize categories data"""
|
|
logger.info("Starting categories initialization")
|
|
if use_cache:
|
|
categories_data = await self._load_cached_data(db, "categories.json")
|
|
if categories_data:
|
|
await self.sync_categories(db, categories_data)
|
|
logger.info("Categories initialized from cache")
|
|
return True
|
|
else:
|
|
logger.warning("No cached categories data found")
|
|
return False
|
|
else:
|
|
tcgcsv_service = self.get_service('tcgcsv')
|
|
categories_data = await tcgcsv_service.get_categories()
|
|
|
|
# Save the categories data
|
|
await self._cache_data(
|
|
db,
|
|
categories_data,
|
|
"categories.json",
|
|
"tcgcsv/categories",
|
|
file_type="json",
|
|
content_type="application/json"
|
|
)
|
|
|
|
await self.sync_categories(db, categories_data)
|
|
logger.info("Categories initialized from API")
|
|
return True
|
|
|
|
async def sync_groups(self, db: Session, groups_data: dict):
|
|
"""Sync groups data to the database using streaming for large datasets"""
|
|
groups = groups_data.get("results", [])
|
|
batch_size = 1000 # Process in batches of 1000
|
|
total_groups = len(groups)
|
|
|
|
with db_transaction(db):
|
|
for i in range(0, total_groups, batch_size):
|
|
batch = groups[i:i + batch_size]
|
|
for group_data in batch:
|
|
existing_group = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.group_id == group_data["groupId"]).first()
|
|
if existing_group:
|
|
# Update existing group
|
|
for key, value in {
|
|
"name": group_data["name"],
|
|
"abbreviation": group_data.get("abbreviation"),
|
|
"is_supplemental": group_data.get("isSupplemental", False),
|
|
"published_on": datetime.fromisoformat(group_data["publishedOn"].replace("Z", "+00:00")) if group_data.get("publishedOn") else None,
|
|
"modified_on": datetime.fromisoformat(group_data["modifiedOn"].replace("Z", "+00:00")) if group_data.get("modifiedOn") else None,
|
|
"category_id": group_data.get("categoryId")
|
|
}.items():
|
|
setattr(existing_group, key, value)
|
|
else:
|
|
new_group = TCGPlayerGroup(
|
|
group_id=group_data["groupId"],
|
|
name=group_data["name"],
|
|
abbreviation=group_data.get("abbreviation"),
|
|
is_supplemental=group_data.get("isSupplemental", False),
|
|
published_on=datetime.fromisoformat(group_data["publishedOn"].replace("Z", "+00:00")) if group_data.get("publishedOn") else None,
|
|
modified_on=datetime.fromisoformat(group_data["modifiedOn"].replace("Z", "+00:00")) if group_data.get("modifiedOn") else None,
|
|
category_id=group_data.get("categoryId")
|
|
)
|
|
db.add(new_group)
|
|
|
|
# Commit after each batch
|
|
db.commit()
|
|
logger.info(f"Processed {min(i + batch_size, total_groups)}/{total_groups} groups")
|
|
|
|
async def init_groups(self, db: Session, use_cache: bool = True, game_ids: List[int] = None) -> bool:
|
|
"""Initialize groups data"""
|
|
logger.info(f"Starting groups initialization for game IDs: {game_ids}")
|
|
tcgcsv_service = self.get_service('tcgcsv')
|
|
for game_id in game_ids:
|
|
if use_cache:
|
|
groups_data = await self._load_cached_data(db, f"groups_{game_id}.json")
|
|
if groups_data:
|
|
await self.sync_groups(db, groups_data)
|
|
logger.info(f"Groups initialized from cache for game ID {game_id}")
|
|
else:
|
|
logger.warning(f"No cached groups data found for game ID {game_id}")
|
|
return False
|
|
else:
|
|
groups_data = await tcgcsv_service.get_groups(game_id)
|
|
|
|
# Save the groups data
|
|
await self._cache_data(
|
|
db,
|
|
groups_data,
|
|
f"groups_{game_id}.json",
|
|
"tcgcsv/groups",
|
|
file_type="json",
|
|
content_type="application/json"
|
|
)
|
|
|
|
await self.sync_groups(db, groups_data)
|
|
logger.info(f"Groups initialized from API for game ID {game_id}")
|
|
return True
|
|
|
|
async def sync_products(self, db: Session, products_data: str):
|
|
"""Sync products data to the database using streaming for large datasets"""
|
|
|
|
# Parse CSV data
|
|
csv_reader = csv.DictReader(io.StringIO(products_data))
|
|
products_list = list(csv_reader)
|
|
batch_size = 1000 # Process in batches of 1000
|
|
total_products = len(products_list)
|
|
|
|
with db_transaction(db):
|
|
for i in range(0, total_products, batch_size):
|
|
batch = products_list[i:i + batch_size]
|
|
for product_data in batch:
|
|
sub_type_name = product_data.get("subTypeName") if product_data.get("subTypeName") else "other"
|
|
existing_product = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.tcgplayer_product_id == product_data["productId"]).filter(TCGPlayerProduct.sub_type_name == sub_type_name).first()
|
|
if existing_product:
|
|
# Update existing product
|
|
for key, value in {
|
|
"name": product_data["name"],
|
|
"clean_name": product_data.get("cleanName"),
|
|
"image_url": product_data.get("imageUrl"),
|
|
"sub_type_name": product_data.get("subTypeName") if product_data.get("subTypeName") else "other",
|
|
"normalized_sub_type_name": product_data.get("subTypeName").lower().replace(" ", "_") if product_data.get("subTypeName") else "other",
|
|
"category_id": product_data.get("categoryId"),
|
|
"group_id": product_data.get("groupId"),
|
|
"url": product_data.get("url"),
|
|
"modified_on": datetime.fromisoformat(product_data["modifiedOn"].replace("Z", "+00:00")) if product_data.get("modifiedOn") else None,
|
|
"image_count": product_data.get("imageCount", 0),
|
|
"ext_rarity": product_data.get("extRarity"),
|
|
"ext_subtype": product_data.get("extSubtype"),
|
|
"ext_oracle_text": product_data.get("extOracleText"),
|
|
"ext_number": product_data.get("extNumber"),
|
|
"low_price": float(product_data.get("lowPrice")) if product_data.get("lowPrice") else None,
|
|
"mid_price": float(product_data.get("midPrice")) if product_data.get("midPrice") else None,
|
|
"high_price": float(product_data.get("highPrice")) if product_data.get("highPrice") else None,
|
|
"market_price": float(product_data.get("marketPrice")) if product_data.get("marketPrice") else None,
|
|
"direct_low_price": float(product_data.get("directLowPrice")) if product_data.get("directLowPrice") else None,
|
|
"ext_flavor_text": product_data.get("extFlavorText"),
|
|
"ext_power": product_data.get("extPower"),
|
|
"ext_toughness": product_data.get("extToughness"),
|
|
"ext_flavor_text": product_data.get("extFlavorText")
|
|
}.items():
|
|
setattr(existing_product, key, value)
|
|
else:
|
|
logger.debug(f"Creating new product: {product_data['productId']} product name: {product_data['name']}")
|
|
new_product = TCGPlayerProduct(
|
|
tcgplayer_product_id=product_data["productId"],
|
|
name=product_data["name"],
|
|
normalized_sub_type_name=product_data.get("subTypeName").lower().replace(" ", "_") if product_data.get("subTypeName") else "other",
|
|
clean_name=product_data.get("cleanName"),
|
|
image_url=product_data.get("imageUrl"),
|
|
category_id=product_data.get("categoryId"),
|
|
group_id=product_data.get("groupId"),
|
|
url=product_data.get("url"),
|
|
modified_on=datetime.fromisoformat(product_data["modifiedOn"].replace("Z", "+00:00")) if product_data.get("modifiedOn") else None,
|
|
image_count=product_data.get("imageCount", 0),
|
|
ext_rarity=product_data.get("extRarity"),
|
|
ext_subtype=product_data.get("extSubtype"),
|
|
ext_oracle_text=product_data.get("extOracleText"),
|
|
ext_number=product_data.get("extNumber"),
|
|
low_price=float(product_data.get("lowPrice")) if product_data.get("lowPrice") else None,
|
|
mid_price=float(product_data.get("midPrice")) if product_data.get("midPrice") else None,
|
|
high_price=float(product_data.get("highPrice")) if product_data.get("highPrice") else None,
|
|
market_price=float(product_data.get("marketPrice")) if product_data.get("marketPrice") else None,
|
|
direct_low_price=float(product_data.get("directLowPrice")) if product_data.get("directLowPrice") else None,
|
|
sub_type_name=product_data.get("subTypeName") if product_data.get("subTypeName") else "other",
|
|
ext_power=product_data.get("extPower"),
|
|
ext_toughness=product_data.get("extToughness"),
|
|
ext_flavor_text=product_data.get("extFlavorText")
|
|
|
|
)
|
|
db.add(new_product)
|
|
|
|
# Commit after each batch
|
|
db.commit()
|
|
logger.info(f"Processed {min(i + batch_size, total_products)}/{total_products} products")
|
|
|
|
async def init_products(self, db: Session, use_cache: bool = True, game_ids: List[int] = None) -> bool:
|
|
"""Initialize products data"""
|
|
logger.info(f"Starting products initialization for game IDs: {game_ids}")
|
|
tcgcsv_service = self.get_service('tcgcsv')
|
|
for game_id in game_ids:
|
|
groups = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.category_id == game_id).all()
|
|
logger.info(f"Processing {len(groups)} groups for game ID {game_id}")
|
|
for group in groups:
|
|
if use_cache:
|
|
products_data = await self._load_cached_data(db, f"products_{game_id}_{group.group_id}.json")
|
|
if products_data:
|
|
await self.sync_products(db, products_data)
|
|
logger.info(f"Products initialized from cache for group {group.group_id}")
|
|
else:
|
|
logger.warning(f"No cached products data found for group {group.group_id}")
|
|
continue
|
|
else:
|
|
# Get CSV data from API
|
|
csv_data = await tcgcsv_service.get_products_and_prices(game_id, group.group_id)
|
|
|
|
# Save the CSV file
|
|
await self.file_service.save_file(
|
|
db,
|
|
csv_data,
|
|
f"products_{game_id}_{group.group_id}.csv",
|
|
"tcgcsv/products",
|
|
file_type="csv",
|
|
content_type="text/csv"
|
|
)
|
|
|
|
# Parse and sync the CSV data
|
|
await self.sync_products(db, csv_data)
|
|
logger.info(f"Products initialized from API for group {group.group_id}")
|
|
return True
|
|
|
|
async def sync_archived_prices(self, db: Session, archived_prices_data: dict, date: datetime):
|
|
"""Sync archived prices data to the database using bulk operations.
|
|
Note: Historical prices are never updated, only new records are inserted."""
|
|
|
|
if not archived_prices_data.get("success"):
|
|
logger.error("Price data sync failed - success flag is false")
|
|
return
|
|
|
|
# Get existing records in bulk to avoid duplicates using a composite key
|
|
existing_records = db.query(TCGPlayerPriceHistory).filter(
|
|
TCGPlayerPriceHistory.date == date
|
|
).all()
|
|
existing_keys = {(r.product_id, r.date, r.sub_type_name) for r in existing_records}
|
|
|
|
# Prepare batch insert data
|
|
price_history_batch = []
|
|
|
|
# Process price data in batches
|
|
for price_data in archived_prices_data.get("results", []):
|
|
try:
|
|
# Get the subtype name from the price data
|
|
sub_type_name = price_data.get("subTypeName", "None")
|
|
|
|
# First try to find product with the requested subtype
|
|
product = db.query(TCGPlayerProduct).filter(
|
|
TCGPlayerProduct.tcgplayer_product_id == price_data["productId"],
|
|
TCGPlayerProduct.sub_type_name == sub_type_name
|
|
).first()
|
|
|
|
# If not found and subtype isn't "other", try with "other" subtype
|
|
if not product and sub_type_name != "other":
|
|
product = db.query(TCGPlayerProduct).filter(
|
|
TCGPlayerProduct.tcgplayer_product_id == price_data["productId"],
|
|
TCGPlayerProduct.sub_type_name == "other"
|
|
).first()
|
|
if product:
|
|
sub_type_name = "other"
|
|
#logger.info(f"Found product {price_data['productId']} with 'other' subtype as fallback for {sub_type_name}")
|
|
|
|
if not product:
|
|
logger.warning(f"No product found for {price_data['productId']} with subtype {sub_type_name} or 'other'")
|
|
continue
|
|
|
|
# Skip if record already exists
|
|
if (product.tcgplayer_product_id, date, sub_type_name) in existing_keys:
|
|
continue
|
|
|
|
# Validate and convert price data
|
|
try:
|
|
price_history = TCGPlayerPriceHistory(
|
|
product_id=product.tcgplayer_product_id,
|
|
sub_type_name=sub_type_name,
|
|
date=date,
|
|
low_price=float(price_data.get("lowPrice")) if price_data.get("lowPrice") else None,
|
|
mid_price=float(price_data.get("midPrice")) if price_data.get("midPrice") else None,
|
|
high_price=float(price_data.get("highPrice")) if price_data.get("highPrice") else None,
|
|
market_price=float(price_data.get("marketPrice")) if price_data.get("marketPrice") else None,
|
|
direct_low_price=float(price_data.get("directLowPrice")) if price_data.get("directLowPrice") else None
|
|
)
|
|
price_history_batch.append(price_history)
|
|
except (ValueError, TypeError) as e:
|
|
logger.error(f"Invalid price data for product {price_data['productId']}: {str(e)}")
|
|
continue
|
|
|
|
# Process in batches of 1000
|
|
if len(price_history_batch) >= 1000:
|
|
with db_transaction(db):
|
|
db.bulk_save_objects(price_history_batch)
|
|
price_history_batch = []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing price data for product {price_data['productId']}: {str(e)}")
|
|
continue
|
|
|
|
# Process any remaining records
|
|
if price_history_batch:
|
|
with db_transaction(db):
|
|
db.bulk_save_objects(price_history_batch)
|
|
|
|
async def init_archived_prices(self, db: Session, start_date: datetime, end_date: datetime, use_cache: bool = True, game_ids: List[int] = None) -> bool:
|
|
"""Initialize archived prices data"""
|
|
logger.info(f"Starting archived prices initialization from {start_date} to {end_date}")
|
|
tcgcsv_service = self.get_service('tcgcsv')
|
|
processed_dates = await tcgcsv_service.get_tcgcsv_date_range(start_date, end_date)
|
|
logger.info(f"Processing {len(processed_dates)} dates")
|
|
|
|
# Convert game_ids to set for faster lookups
|
|
desired_game_ids = set(game_ids) if game_ids else set()
|
|
|
|
for date in processed_dates:
|
|
date_path = f"app/data/cache/tcgcsv/prices/{date}"
|
|
|
|
# Check if we already have the data for this date
|
|
if use_cache and os.path.exists(date_path):
|
|
logger.info(f"Using cached price data for {date}")
|
|
else:
|
|
logger.info(f"Downloading and processing archived prices for {date}")
|
|
# Download and extract the archive
|
|
archive_data = await tcgcsv_service.get_archived_prices_for_date(date)
|
|
|
|
# Save the archive file
|
|
file_record = await self.file_service.save_file(
|
|
db,
|
|
archive_data,
|
|
f"prices-{date}.ppmd.7z",
|
|
"tcgcsv/prices/zip",
|
|
file_type="application/x-7z-compressed",
|
|
content_type="application/x-7z-compressed"
|
|
)
|
|
|
|
# Extract the 7z file to a temporary directory
|
|
temp_extract_path = f"app/data/cache/tcgcsv/prices/temp_{date}"
|
|
os.makedirs(temp_extract_path, exist_ok=True)
|
|
|
|
with py7zr.SevenZipFile(file_record.path, 'r') as archive:
|
|
archive.extractall(path=temp_extract_path)
|
|
|
|
# Find the date subdirectory in the temp directory
|
|
date_subdir = os.path.join(temp_extract_path, str(date))
|
|
if os.path.exists(date_subdir):
|
|
# Remove existing directory if it exists
|
|
if os.path.exists(date_path):
|
|
shutil.rmtree(date_path)
|
|
|
|
# Create the destination directory
|
|
os.makedirs(date_path, exist_ok=True)
|
|
|
|
# Move contents from the date subdirectory to the final path
|
|
for item in os.listdir(date_subdir):
|
|
src = os.path.join(date_subdir, item)
|
|
dst = os.path.join(date_path, item)
|
|
os.rename(src, dst)
|
|
|
|
# Clean up the temporary directory
|
|
os.rmdir(date_subdir)
|
|
os.rmdir(temp_extract_path)
|
|
|
|
# Process each category directory
|
|
for category_id in os.listdir(date_path):
|
|
# Skip categories that aren't in our desired game IDs
|
|
if int(category_id) not in desired_game_ids:
|
|
continue
|
|
|
|
category_path = os.path.join(date_path, category_id)
|
|
if not os.path.isdir(category_path):
|
|
continue
|
|
|
|
# Process each group directory
|
|
for group_id in os.listdir(category_path):
|
|
group_path = os.path.join(category_path, group_id)
|
|
if not os.path.isdir(group_path):
|
|
continue
|
|
|
|
# Process the prices file
|
|
prices_file = os.path.join(group_path, "prices")
|
|
if not os.path.exists(prices_file):
|
|
continue
|
|
|
|
try:
|
|
with open(prices_file, 'r') as f:
|
|
price_data = json.load(f)
|
|
if price_data.get("success"):
|
|
await self.sync_archived_prices(db, price_data, datetime.strptime(date, "%Y-%m-%d"))
|
|
logger.info(f"Processed prices for category {category_id}, group {group_id} on {date}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing prices file {prices_file}: {str(e)}")
|
|
continue
|
|
|
|
return True
|
|
|
|
async def init_mtgjson(self, db: Session, use_cache: bool = True) -> Dict[str, Any]:
|
|
"""Initialize MTGJSON data"""
|
|
logger.info("Starting MTGJSON initialization")
|
|
mtgjson_service = self.get_service('mtgjson')
|
|
identifiers_count = 0
|
|
skus_count = 0
|
|
|
|
# Get identifiers data
|
|
identifiers_data = await mtgjson_service.get_identifiers(db, use_cache)
|
|
if identifiers_data and "data" in identifiers_data:
|
|
identifiers_count = await self.sync_mtgjson_identifiers(db, list(identifiers_data["data"].values()))
|
|
|
|
# Get SKUs data
|
|
skus_data = await mtgjson_service.get_skus(db, use_cache)
|
|
if skus_data and "data" in skus_data:
|
|
skus_count = await self.sync_mtgjson_skus(db, skus_data)
|
|
|
|
return {
|
|
"identifiers_processed": identifiers_count,
|
|
"skus_processed": skus_count
|
|
}
|
|
|
|
async def sync_mtgjson_identifiers(self, db: Session, identifiers_data: List[dict]) -> int:
|
|
|
|
count = 0
|
|
with db_transaction(db):
|
|
# Load all existing UUIDs once
|
|
existing_cards = {
|
|
card.mtgjson_uuid: card
|
|
for card in db.query(MTGJSONCard).all()
|
|
}
|
|
|
|
new_cards = []
|
|
|
|
for card_data in identifiers_data:
|
|
if not isinstance(card_data, dict):
|
|
logger.debug(f"Skipping non-dict item: {card_data}")
|
|
continue
|
|
|
|
uuid = card_data.get("uuid")
|
|
identifiers = card_data.get("identifiers", {})
|
|
|
|
if uuid in existing_cards:
|
|
card = existing_cards[uuid]
|
|
updates = {
|
|
"name": card_data.get("name"),
|
|
"set_code": card_data.get("setCode"),
|
|
"abu_id": identifiers.get("abuId"),
|
|
"card_kingdom_etched_id": identifiers.get("cardKingdomEtchedId"),
|
|
"card_kingdom_foil_id": identifiers.get("cardKingdomFoilId"),
|
|
"card_kingdom_id": identifiers.get("cardKingdomId"),
|
|
"cardsphere_id": identifiers.get("cardsphereId"),
|
|
"cardsphere_foil_id": identifiers.get("cardsphereFoilId"),
|
|
"cardtrader_id": identifiers.get("cardtraderId"),
|
|
"csi_id": identifiers.get("csiId"),
|
|
"mcm_id": identifiers.get("mcmId"),
|
|
"mcm_meta_id": identifiers.get("mcmMetaId"),
|
|
"miniaturemarket_id": identifiers.get("miniaturemarketId"),
|
|
"mtg_arena_id": identifiers.get("mtgArenaId"),
|
|
"mtgjson_foil_version_id": identifiers.get("mtgjsonFoilVersionId"),
|
|
"mtgjson_non_foil_version_id": identifiers.get("mtgjsonNonFoilVersionId"),
|
|
"mtgjson_v4_id": identifiers.get("mtgjsonV4Id"),
|
|
"mtgo_foil_id": identifiers.get("mtgoFoilId"),
|
|
"mtgo_id": identifiers.get("mtgoId"),
|
|
"multiverse_id": identifiers.get("multiverseId"),
|
|
"scg_id": identifiers.get("scgId"),
|
|
"scryfall_id": identifiers.get("scryfallId"),
|
|
"scryfall_card_back_id": identifiers.get("scryfallCardBackId"),
|
|
"scryfall_oracle_id": identifiers.get("scryfallOracleId"),
|
|
"scryfall_illustration_id": identifiers.get("scryfallIllustrationId"),
|
|
"tcgplayer_product_id": identifiers.get("tcgplayerProductId"),
|
|
"tcgplayer_etched_product_id": identifiers.get("tcgplayerEtchedProductId"),
|
|
"tnt_id": identifiers.get("tntId")
|
|
}
|
|
|
|
for k, v in updates.items():
|
|
if getattr(card, k) != v:
|
|
setattr(card, k, v)
|
|
|
|
else:
|
|
new_cards.append(MTGJSONCard(
|
|
mtgjson_uuid=uuid,
|
|
name=card_data.get("name"),
|
|
set_code=card_data.get("setCode"),
|
|
abu_id=identifiers.get("abuId"),
|
|
card_kingdom_etched_id=identifiers.get("cardKingdomEtchedId"),
|
|
card_kingdom_foil_id=identifiers.get("cardKingdomFoilId"),
|
|
card_kingdom_id=identifiers.get("cardKingdomId"),
|
|
cardsphere_id=identifiers.get("cardsphereId"),
|
|
cardsphere_foil_id=identifiers.get("cardsphereFoilId"),
|
|
cardtrader_id=identifiers.get("cardtraderId"),
|
|
csi_id=identifiers.get("csiId"),
|
|
mcm_id=identifiers.get("mcmId"),
|
|
mcm_meta_id=identifiers.get("mcmMetaId"),
|
|
miniaturemarket_id=identifiers.get("miniaturemarketId"),
|
|
mtg_arena_id=identifiers.get("mtgArenaId"),
|
|
mtgjson_foil_version_id=identifiers.get("mtgjsonFoilVersionId"),
|
|
mtgjson_non_foil_version_id=identifiers.get("mtgjsonNonFoilVersionId"),
|
|
mtgjson_v4_id=identifiers.get("mtgjsonV4Id"),
|
|
mtgo_foil_id=identifiers.get("mtgoFoilId"),
|
|
mtgo_id=identifiers.get("mtgoId"),
|
|
multiverse_id=identifiers.get("multiverseId"),
|
|
scg_id=identifiers.get("scgId"),
|
|
scryfall_id=identifiers.get("scryfallId"),
|
|
scryfall_card_back_id=identifiers.get("scryfallCardBackId"),
|
|
scryfall_oracle_id=identifiers.get("scryfallOracleId"),
|
|
scryfall_illustration_id=identifiers.get("scryfallIllustrationId"),
|
|
tcgplayer_product_id=identifiers.get("tcgplayerProductId"),
|
|
tcgplayer_etched_product_id=identifiers.get("tcgplayerEtchedProductId"),
|
|
tnt_id=identifiers.get("tntId")
|
|
))
|
|
|
|
count += 1
|
|
|
|
if new_cards:
|
|
db.bulk_save_objects(new_cards)
|
|
|
|
return count
|
|
|
|
async def sync_mtgjson_skus(self, db: Session, skus_data: dict) -> int:
|
|
count = 0
|
|
sku_details_by_key = {}
|
|
|
|
for mtgjson_uuid, product_data in skus_data["data"].items():
|
|
for sku_data in product_data:
|
|
sku_id = sku_data.get("skuId")
|
|
if sku_id is None or sku_id in sku_details_by_key:
|
|
continue # Skip if missing or already added
|
|
|
|
sku_details_by_key[sku_id] = {
|
|
"mtgjson_uuid": mtgjson_uuid,
|
|
"tcgplayer_sku_id": sku_id,
|
|
"tcgplayer_product_id": sku_data.get("productId"),
|
|
"printing": sku_data.get("printing"),
|
|
"normalized_printing": sku_data.get("printing", "").lower().replace(" ", "_").replace("non_foil", "normal") if sku_data.get("printing") else None,
|
|
"condition": sku_data.get("condition"),
|
|
"finish": sku_data.get("finish"),
|
|
"language": sku_data.get("language"),
|
|
}
|
|
|
|
with db_transaction(db):
|
|
db.flush()
|
|
|
|
valid_uuids = {uuid for (uuid,) in db.query(MTGJSONCard.mtgjson_uuid).all()}
|
|
valid_product_keys = {
|
|
(product.tcgplayer_product_id, product.normalized_sub_type_name)
|
|
for product in db.query(TCGPlayerProduct.tcgplayer_product_id, TCGPlayerProduct.normalized_sub_type_name)
|
|
}
|
|
|
|
existing_sku_ids = {
|
|
sku.tcgplayer_sku_id
|
|
for sku in db.query(MTGJSONSKU.tcgplayer_sku_id).all()
|
|
}
|
|
|
|
existing = {
|
|
(sku.mtgjson_uuid, sku.tcgplayer_sku_id): sku
|
|
for sku in db.query(MTGJSONSKU).all()
|
|
}
|
|
|
|
new_skus = []
|
|
|
|
for data in sku_details_by_key.values():
|
|
sku_id = data["tcgplayer_sku_id"]
|
|
|
|
if sku_id in existing_sku_ids:
|
|
continue
|
|
|
|
mtgjson_uuid = data["mtgjson_uuid"]
|
|
product_id = data["tcgplayer_product_id"]
|
|
normalized_printing = data["normalized_printing"]
|
|
|
|
if mtgjson_uuid not in valid_uuids:
|
|
continue
|
|
|
|
if (product_id, normalized_printing) not in valid_product_keys:
|
|
continue
|
|
|
|
key = (mtgjson_uuid, sku_id)
|
|
|
|
if key in existing:
|
|
record = existing[key]
|
|
for field, value in data.items():
|
|
if field not in ("mtgjson_uuid", "tcgplayer_sku_id") and getattr(record, field) != value:
|
|
setattr(record, field, value)
|
|
else:
|
|
new_skus.append(MTGJSONSKU(**data))
|
|
count += 1
|
|
|
|
if new_skus:
|
|
db.bulk_save_objects(new_skus)
|
|
|
|
return count
|
|
|
|
|
|
|
|
|
|
async def initialize_data(
|
|
self,
|
|
db: Session,
|
|
game_ids: List[int],
|
|
use_cache: bool = False,
|
|
init_categories: bool = True,
|
|
init_groups: bool = True,
|
|
init_products: bool = True,
|
|
init_archived_prices: bool = True,
|
|
archived_prices_start_date: Optional[str] = None,
|
|
archived_prices_end_date: Optional[str] = None,
|
|
init_mtgjson: bool = True
|
|
) -> Dict[str, Any]:
|
|
"""Initialize 3rd party API data loads with configurable steps"""
|
|
logger.info("Starting data initialization process")
|
|
results = {}
|
|
if init_categories:
|
|
logger.info("Initializing categories...")
|
|
results["categories"] = await self.init_categories(db, use_cache)
|
|
if init_groups:
|
|
logger.info("Initializing groups...")
|
|
results["groups"] = await self.init_groups(db, use_cache, game_ids)
|
|
if init_products:
|
|
logger.info("Initializing products...")
|
|
results["products"] = await self.init_products(db, use_cache, game_ids)
|
|
if init_archived_prices:
|
|
logger.info("Initializing archived prices...")
|
|
results["archived_prices"] = await self.init_archived_prices(
|
|
db,
|
|
archived_prices_start_date,
|
|
archived_prices_end_date,
|
|
use_cache,
|
|
game_ids
|
|
)
|
|
if init_mtgjson:
|
|
logger.info("Initializing MTGJSON data...")
|
|
results["mtgjson"] = await self.init_mtgjson(db, use_cache)
|
|
|
|
logger.info("Data initialization completed")
|
|
return results
|
|
|
|
async def clear_cache(self, db: Session) -> None:
|
|
"""Clear all cached data"""
|
|
# Delete all files in categories, groups, and products directories
|
|
for subdir in ["categories", "groups", "products"]:
|
|
files = await self.file_service.list_files(db, file_type="json")
|
|
for file in files:
|
|
if file.path.startswith(subdir):
|
|
await self.file_service.delete_file(db, file.id)
|
|
await self.mtgjson_service.clear_cache()
|
|
print("Cache cleared")
|
|
|
|
async def initialize_inventory_data(self, db: Session) -> None:
|
|
"""Initialize inventory data"""
|
|
with db_transaction(db):
|
|
logger.info("Initializing inventory data...")
|
|
# set expected value
|
|
expected_value_box = SealedExpectedValue(
|
|
tcgplayer_product_id=619645,
|
|
expected_value=136.42
|
|
)
|
|
db.add(expected_value_box)
|
|
#db.flush()
|
|
#expected_value_case = SealedExpectedValue(
|
|
# tcgplayer_product_id=562119,
|
|
# expected_value=820.69
|
|
#)
|
|
#db.add(expected_value_case)
|
|
db.flush()
|
|
|
|
inventory_service = self.get_service("inventory")
|
|
customer = await inventory_service.create_customer(db, "Bob Smith")
|
|
vendor = await inventory_service.create_vendor(db, "Joe Blow")
|
|
marketplace = await inventory_service.create_marketplace(db, "Tcgplayer")
|
|
transaction = await inventory_service.create_purchase_transaction(db, PurchaseTransactionCreate(
|
|
vendor_id=vendor.id,
|
|
transaction_date=datetime.now(),
|
|
items=[PurchaseItem(product_id=619645, unit_price=100, quantity=1, is_case=False)],
|
|
transaction_notes="tdm real box test"
|
|
#PurchaseItem(product_id=562119, unit_price=800.01, quantity=2, is_case=True, num_boxes=6)],
|
|
#transaction_notes="Test Transaction: 1 case and 2 boxes of foundations"
|
|
))
|
|
logger.info(f"Transaction created: {transaction}")
|
|
case_num = 0
|
|
for item in transaction.transaction_items:
|
|
logger.info(f"Item: {item}")
|
|
if item.inventory_item.physical_item.item_type == "box":
|
|
manabox_service = self.get_service("manabox")
|
|
#file_path = 'app/data/test_data/manabox_test_file.csv'
|
|
file_path = 'app/data/test_data/tdmtest.csv'
|
|
file_bytes = open(file_path, 'rb').read()
|
|
manabox_file = await manabox_service.process_manabox_csv(db, file_bytes, {"source": "test", "description": "test"}, wait=True)
|
|
# Ensure manabox_file is a list before passing it
|
|
if not isinstance(manabox_file, list):
|
|
manabox_file = [manabox_file]
|
|
box_service = self.get_service("box")
|
|
open_event = await box_service.open_box(db, item.inventory_item.physical_item, manabox_file)
|
|
# get all cards from box
|
|
cards = open_event.resulting_items if open_event.resulting_items else []
|
|
marketplace_listing_service = self.get_service("marketplace_listing")
|
|
for card in cards:
|
|
logger.info(f"card: {card}")
|
|
# create marketplace listing
|
|
await marketplace_listing_service.create_marketplace_listing(db, card.inventory_item, marketplace)
|
|
elif item.inventory_item.physical_item.item_type == "case":
|
|
if case_num == 0:
|
|
logger.info(f"sealed case {case_num} opening...")
|
|
case_service = self.get_service("case")
|
|
success = await case_service.open_case(db, item.inventory_item.physical_item, 562119)
|
|
logger.info(f"sealed case {case_num} opening success: {success}")
|
|
case_num += 1
|
|
|
|
logger.info("Inventory data initialized")
|