This commit is contained in:
2025-04-12 23:51:17 -04:00
parent df6490cab0
commit 56c2d1de47
2870 changed files with 3700 additions and 206 deletions

View File

@ -26,8 +26,9 @@ class BaseExternalService:
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
content_type: str = "application/json"
) -> Union[Dict[str, Any], str]:
content_type: str = "application/json",
binary: bool = False
) -> Union[Dict[str, Any], str, bytes]:
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
@ -43,6 +44,9 @@ class BaseExternalService:
response_content_type = response.headers.get('content-type', '').lower()
logger.info(f"Making request to {url}")
if binary:
return await response.read()
# Get the raw response text first
raw_response = await response.text()

View File

@ -0,0 +1,312 @@
import os
import json
import zipfile
import aiohttp
import asyncio
import time
import sys
from typing import Dict, Any, Optional, Generator
from sqlalchemy.orm import Session
from datetime import datetime
from app.models.mtgjson_card import MTGJSONCard
from app.models.mtgjson_sku import MTGJSONSKU
class MTGJSONService:
def __init__(self, cache_dir: str = "app/data/cache/mtgjson", batch_size: int = 1000):
self.cache_dir = cache_dir
self.identifiers_dir = os.path.join(cache_dir, "identifiers")
self.skus_dir = os.path.join(cache_dir, "skus")
self.batch_size = batch_size
# Create necessary directories
os.makedirs(cache_dir, exist_ok=True)
os.makedirs(self.identifiers_dir, exist_ok=True)
os.makedirs(self.skus_dir, exist_ok=True)
def _format_progress(self, current: int, total: int, start_time: float) -> str:
"""Format a progress message with percentage and timing information"""
elapsed = time.time() - start_time
if total > 0:
percent = (current / total) * 100
items_per_second = current / elapsed if elapsed > 0 else 0
eta = (total - current) / items_per_second if items_per_second > 0 else 0
return f"[{current}/{total} ({percent:.1f}%)] {items_per_second:.1f} items/sec, ETA: {eta:.1f}s"
return f"[{current} items] {current/elapsed:.1f} items/sec"
def _print_progress(self, message: str, end: str = "\n") -> None:
"""Print progress message with flush"""
print(message, end=end, flush=True)
async def _download_file(self, url: str, output_path: str) -> None:
"""Download a file from the given URL to the specified path using streaming"""
print(f"Downloading {url}...")
start_time = time.time()
total_size = 0
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
total_size = int(response.headers.get('content-length', 0))
with open(output_path, 'wb') as f:
downloaded = 0
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
elapsed = time.time() - start_time
speed = downloaded / elapsed / 1024 / 1024 # MB/s
print(f"\rDownloading: {percent:.1f}% ({downloaded/1024/1024:.1f}MB/{total_size/1024/1024:.1f}MB) at {speed:.1f}MB/s", end="")
print("\nDownload complete!")
else:
raise Exception(f"Failed to download file from {url}. Status: {response.status}")
async def _unzip_file(self, zip_path: str, extract_dir: str) -> str:
"""Unzip a file to the specified directory and return the path to the extracted JSON file"""
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
json_filename = zip_ref.namelist()[0]
zip_ref.extractall(extract_dir)
return os.path.join(extract_dir, json_filename)
def _stream_json_file(self, file_path: str) -> Generator[Dict[str, Any], None, None]:
"""Stream a JSON file and yield items one at a time"""
print(f"Starting to stream JSON file: {file_path}")
with open(file_path, 'r') as f:
# Load the entire file since MTGJSON uses a specific format
data = json.load(f)
# First yield the meta data
if "meta" in data:
yield {"type": "meta", "data": data["meta"]}
# Then yield each item in the data section
if "data" in data:
for key, value in data["data"].items():
yield {"type": "item", "data": {key: value}}
async def _process_batch(self, db: Session, items: list, model_class, commit: bool = True) -> int:
"""Process a batch of items and add them to the database"""
processed = 0
for item in items:
if model_class == MTGJSONCard:
# Check if card already exists
existing_card = db.query(MTGJSONCard).filter(MTGJSONCard.card_id == item["card_id"]).first()
if existing_card:
continue
new_item = MTGJSONCard(
card_id=item["card_id"],
name=item["name"],
set_code=item["set_code"],
uuid=item["uuid"],
abu_id=item.get("abu_id"),
card_kingdom_etched_id=item.get("card_kingdom_etched_id"),
card_kingdom_foil_id=item.get("card_kingdom_foil_id"),
card_kingdom_id=item.get("card_kingdom_id"),
cardsphere_id=item.get("cardsphere_id"),
cardsphere_foil_id=item.get("cardsphere_foil_id"),
cardtrader_id=item.get("cardtrader_id"),
csi_id=item.get("csi_id"),
mcm_id=item.get("mcm_id"),
mcm_meta_id=item.get("mcm_meta_id"),
miniaturemarket_id=item.get("miniaturemarket_id"),
mtg_arena_id=item.get("mtg_arena_id"),
mtgjson_foil_version_id=item.get("mtgjson_foil_version_id"),
mtgjson_non_foil_version_id=item.get("mtgjson_non_foil_version_id"),
mtgjson_v4_id=item.get("mtgjson_v4_id"),
mtgo_foil_id=item.get("mtgo_foil_id"),
mtgo_id=item.get("mtgo_id"),
multiverse_id=item.get("multiverse_id"),
scg_id=item.get("scg_id"),
scryfall_id=item.get("scryfall_id"),
scryfall_card_back_id=item.get("scryfall_card_back_id"),
scryfall_oracle_id=item.get("scryfall_oracle_id"),
scryfall_illustration_id=item.get("scryfall_illustration_id"),
tcgplayer_product_id=item.get("tcgplayer_product_id"),
tcgplayer_etched_product_id=item.get("tcgplayer_etched_product_id"),
tnt_id=item.get("tnt_id")
)
else: # MTGJSONSKU
# Check if SKU already exists
existing_sku = db.query(MTGJSONSKU).filter(MTGJSONSKU.sku_id == item["sku_id"]).first()
if existing_sku:
continue
new_item = MTGJSONSKU(
sku_id=str(item["sku_id"]),
product_id=str(item["product_id"]),
condition=item["condition"],
finish=item["finish"],
language=item["language"],
printing=item["printing"],
card_id=item["card_id"]
)
db.add(new_item)
processed += 1
if commit:
try:
db.commit()
except Exception as e:
db.rollback()
raise e
return processed
async def download_and_process_identifiers(self, db: Session) -> Dict[str, int]:
"""Download, unzip and process AllIdentifiers.json.zip using streaming"""
self._print_progress("Starting MTGJSON identifiers processing...")
start_time = time.time()
zip_path = os.path.join(self.identifiers_dir, "AllIdentifiers.json.zip")
await self._download_file(
"https://mtgjson.com/api/v5/AllIdentifiers.json.zip",
zip_path
)
self._print_progress("Unzipping file...")
json_path = await self._unzip_file(zip_path, self.identifiers_dir)
cards_processed = 0
current_batch = []
total_cards = 0
last_progress_time = time.time()
self._print_progress("Processing cards...")
try:
for item in self._stream_json_file(json_path):
if item["type"] == "meta":
self._print_progress(f"Processing MTGJSON data version {item['data'].get('version')} from {item['data'].get('date')}")
continue
card_data = item["data"]
card_id = list(card_data.keys())[0]
card_info = card_data[card_id]
total_cards += 1
current_batch.append({
"card_id": card_id,
"name": card_info.get("name"),
"set_code": card_info.get("setCode"),
"uuid": card_info.get("uuid"),
"abu_id": card_info.get("identifiers", {}).get("abuId"),
"card_kingdom_etched_id": card_info.get("identifiers", {}).get("cardKingdomEtchedId"),
"card_kingdom_foil_id": card_info.get("identifiers", {}).get("cardKingdomFoilId"),
"card_kingdom_id": card_info.get("identifiers", {}).get("cardKingdomId"),
"cardsphere_id": card_info.get("identifiers", {}).get("cardsphereId"),
"cardsphere_foil_id": card_info.get("identifiers", {}).get("cardsphereFoilId"),
"cardtrader_id": card_info.get("identifiers", {}).get("cardtraderId"),
"csi_id": card_info.get("identifiers", {}).get("csiId"),
"mcm_id": card_info.get("identifiers", {}).get("mcmId"),
"mcm_meta_id": card_info.get("identifiers", {}).get("mcmMetaId"),
"miniaturemarket_id": card_info.get("identifiers", {}).get("miniaturemarketId"),
"mtg_arena_id": card_info.get("identifiers", {}).get("mtgArenaId"),
"mtgjson_foil_version_id": card_info.get("identifiers", {}).get("mtgjsonFoilVersionId"),
"mtgjson_non_foil_version_id": card_info.get("identifiers", {}).get("mtgjsonNonFoilVersionId"),
"mtgjson_v4_id": card_info.get("identifiers", {}).get("mtgjsonV4Id"),
"mtgo_foil_id": card_info.get("identifiers", {}).get("mtgoFoilId"),
"mtgo_id": card_info.get("identifiers", {}).get("mtgoId"),
"multiverse_id": card_info.get("identifiers", {}).get("multiverseId"),
"scg_id": card_info.get("identifiers", {}).get("scgId"),
"scryfall_id": card_info.get("identifiers", {}).get("scryfallId"),
"scryfall_card_back_id": card_info.get("identifiers", {}).get("scryfallCardBackId"),
"scryfall_oracle_id": card_info.get("identifiers", {}).get("scryfallOracleId"),
"scryfall_illustration_id": card_info.get("identifiers", {}).get("scryfallIllustrationId"),
"tcgplayer_product_id": card_info.get("identifiers", {}).get("tcgplayerProductId"),
"tcgplayer_etched_product_id": card_info.get("identifiers", {}).get("tcgplayerEtchedProductId"),
"tnt_id": card_info.get("identifiers", {}).get("tntId"),
"data": card_info
})
if len(current_batch) >= self.batch_size:
batch_processed = await self._process_batch(db, current_batch, MTGJSONCard)
cards_processed += batch_processed
current_batch = []
current_time = time.time()
if current_time - last_progress_time >= 1.0: # Update progress every second
self._print_progress(f"\r{self._format_progress(cards_processed, total_cards, start_time)}", end="")
last_progress_time = current_time
except Exception as e:
self._print_progress(f"\nError during processing: {str(e)}")
raise
# Process remaining items
if current_batch:
batch_processed = await self._process_batch(db, current_batch, MTGJSONCard)
cards_processed += batch_processed
total_time = time.time() - start_time
self._print_progress(f"\nProcessing complete! Processed {cards_processed} cards in {total_time:.1f} seconds")
return {"cards_processed": cards_processed}
async def download_and_process_skus(self, db: Session) -> Dict[str, int]:
"""Download, unzip and process TcgplayerSkus.json.zip using streaming"""
self._print_progress("Starting MTGJSON SKUs processing...")
start_time = time.time()
zip_path = os.path.join(self.skus_dir, "TcgplayerSkus.json.zip")
await self._download_file(
"https://mtgjson.com/api/v5/TcgplayerSkus.json.zip",
zip_path
)
self._print_progress("Unzipping file...")
json_path = await self._unzip_file(zip_path, self.skus_dir)
skus_processed = 0
current_batch = []
total_skus = 0
last_progress_time = time.time()
self._print_progress("Processing SKUs...")
try:
for item in self._stream_json_file(json_path):
if item["type"] == "meta":
self._print_progress(f"Processing MTGJSON SKUs version {item['data'].get('version')} from {item['data'].get('date')}")
continue
# The data structure is {card_uuid: [sku1, sku2, ...]}
for card_uuid, sku_list in item["data"].items():
for sku in sku_list:
total_skus += 1
current_batch.append({
"sku_id": str(sku.get("skuId")),
"product_id": str(sku.get("productId")),
"condition": sku.get("condition"),
"finish": sku.get("finish", "NORMAL"), # Default to NORMAL if not specified
"language": sku.get("language"),
"printing": sku.get("printing"),
"card_id": card_uuid,
"data": sku
})
if len(current_batch) >= self.batch_size:
batch_processed = await self._process_batch(db, current_batch, MTGJSONSKU)
skus_processed += batch_processed
current_batch = []
current_time = time.time()
if current_time - last_progress_time >= 1.0: # Update progress every second
self._print_progress(f"\r{self._format_progress(skus_processed, total_skus, start_time)}", end="")
last_progress_time = current_time
except Exception as e:
self._print_progress(f"\nError during processing: {str(e)}")
raise
# Process remaining items
if current_batch:
batch_processed = await self._process_batch(db, current_batch, MTGJSONSKU)
skus_processed += batch_processed
total_time = time.time() - start_time
self._print_progress(f"\nProcessing complete! Processed {skus_processed} SKUs in {total_time:.1f} seconds")
return {"skus_processed": skus_processed}
async def clear_cache(self) -> None:
"""Clear all cached data"""
for subdir in ["identifiers", "skus"]:
dir_path = os.path.join(self.cache_dir, subdir)
if os.path.exists(dir_path):
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
if os.path.isfile(file_path):
os.unlink(file_path)
print("MTGJSON cache cleared")

View File

@ -1,5 +1,5 @@
from typing import List, Dict, Any
from datetime import datetime
from datetime import datetime, timedelta
import csv
import io
from app.services.external_api.base_external_service import BaseExternalService
@ -7,21 +7,23 @@ from app.models.tcgplayer_group import TCGPlayerGroup
from app.models.tcgplayer_product import TCGPlayerProduct
from app.models.tcgplayer_category import TCGPlayerCategory
from sqlalchemy.orm import Session
import py7zr
import os
class TCGCSVService(BaseExternalService):
def __init__(self):
super().__init__(base_url="https://tcgcsv.com/tcgplayer/")
super().__init__(base_url="https://tcgcsv.com/")
async def get_groups(self, game_ids: List[int]) -> Dict[str, Any]:
"""Fetch groups for specific game IDs from TCGCSV API"""
game_ids_str = ",".join(map(str, game_ids))
endpoint = f"{game_ids_str}/groups"
endpoint = f"tcgplayer/{game_ids_str}/groups"
return await self._make_request("GET", endpoint)
async def get_products_and_prices(self, game_ids: List[int], group_id: int) -> List[Dict[str, Any]]:
"""Fetch products and prices for a specific group from TCGCSV API"""
game_ids_str = ",".join(map(str, game_ids))
endpoint = f"{game_ids_str}/{group_id}/ProductsAndPrices.csv"
endpoint = f"tcgplayer/{game_ids_str}/{group_id}/ProductsAndPrices.csv"
response = await self._make_request("GET", endpoint, headers={"Accept": "text/csv"})
# Parse CSV response
@ -31,8 +33,63 @@ class TCGCSVService(BaseExternalService):
async def get_categories(self) -> Dict[str, Any]:
"""Fetch all categories from TCGCSV API"""
endpoint = "categories"
endpoint = "tcgplayer/categories"
return await self._make_request("GET", endpoint)
async def get_archived_prices_for_date(self, date_str: str):
"""Fetch archived prices from TCGCSV API"""
# Check if the date directory already exists
extract_path = f"app/data/cache/tcgcsv/prices/{date_str}"
if os.path.exists(extract_path):
print(f"Prices for date {date_str} already exist, skipping download")
return date_str
# Download the archive file
endpoint = f"archive/tcgplayer/prices-{date_str}.ppmd.7z"
response = await self._make_request("GET", endpoint, binary=True)
# Save the archive file
archive_path = f"app/data/cache/tcgcsv/prices/zip/prices-{date_str}.ppmd.7z"
os.makedirs(os.path.dirname(archive_path), exist_ok=True)
with open(archive_path, "wb") as f:
f.write(response)
# Extract the 7z file
with py7zr.SevenZipFile(archive_path, 'r') as archive:
# Extract to a directory named after the date
os.makedirs(extract_path, exist_ok=True)
archive.extractall(path=extract_path)
# The extracted files will be in a directory structure like:
# {date_str}/{game_id}/{group_id}/prices
return date_str
async def get_archived_prices_for_date_range(self, start_date: str, end_date: str):
"""Fetch archived prices for a date range from TCGCSV API"""
# Convert string dates to datetime objects
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
# Set minimum start date
min_start_date = datetime.strptime("2025-02-08", "%Y-%m-%d")
if start_dt < min_start_date:
start_dt = min_start_date
# Set maximum end date to today
today = datetime.now()
if end_dt > today:
end_dt = today
# Generate date range
date_range = []
current_dt = start_dt
while current_dt <= end_dt:
date_range.append(current_dt.strftime("%Y-%m-%d"))
current_dt += timedelta(days=1)
# Process each date
for date_str in date_range:
await self.get_archived_prices_for_date(date_str)
async def sync_groups_to_db(self, db: Session, game_ids: List[int]) -> List[TCGPlayerGroup]:
"""Fetch groups from API and sync them to the database"""