i think most of this works lole

This commit is contained in:
2025-04-24 23:34:13 -04:00
parent 210a033695
commit 56ba750aad
50 changed files with 154001 additions and 2606 deletions

View File

@ -1,23 +1,26 @@
import os
import json
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Union, Generator, Callable, AsyncGenerator
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any, Union
from sqlalchemy.orm import Session
from app.models.tcgplayer_group import TCGPlayerGroup
from app.models.tcgplayer_product import TCGPlayerProduct
from app.models.tcgplayer_category import TCGPlayerCategory
from app.models.tcgplayer_products import TCGPlayerProduct, TCGPlayerCategory, TCGPlayerGroup
from app.models.inventory_management import SealedExpectedValue
from app.services.base_service import BaseService
from app.schemas.file import FileInDB
from app.db.database import transaction
from app.db.database import transaction as db_transaction
from app.schemas.transaction import PurchaseTransactionCreate, PurchaseItem
from app.contexts.inventory_item import InventoryItemContextFactory
from app.models.tcgplayer_products import MTGJSONSKU, MTGJSONCard
from app.models.tcgplayer_products import TCGPlayerPriceHistory
import csv
import io
import logging
from app.models.tcgplayer_price_history import TCGPlayerPriceHistory
from sqlalchemy import and_, bindparam, update, insert
import py7zr
import shutil
import py7zr
logger = logging.getLogger(__name__)
class DataInitializationService(BaseService):
def __init__(self):
super().__init__(None)
@ -54,7 +57,8 @@ class DataInitializationService(BaseService):
file_record = await self.file_service.get_file_by_filename(db, filename)
if file_record:
# Check if cache is expired (7 days)
cache_age = datetime.now() - file_record.created_at
# Ensure both datetimes are timezone-aware
cache_age = datetime.now(timezone.utc) - file_record.created_at
if cache_age.days < 7:
with open(file_record.path, 'r') as f:
return json.load(f)
@ -70,7 +74,7 @@ class DataInitializationService(BaseService):
batch_size = 1000 # Process in batches of 1000
total_categories = len(categories)
with transaction(db):
with db_transaction(db):
for i in range(0, total_categories, batch_size):
batch = categories[i:i + batch_size]
for category_data in batch:
@ -150,7 +154,7 @@ class DataInitializationService(BaseService):
batch_size = 1000 # Process in batches of 1000
total_groups = len(groups)
with transaction(db):
with db_transaction(db):
for i in range(0, total_groups, batch_size):
batch = groups[i:i + batch_size]
for group_data in batch:
@ -214,8 +218,6 @@ class DataInitializationService(BaseService):
async def sync_products(self, db: Session, products_data: str):
"""Sync products data to the database using streaming for large datasets"""
import csv
import io
# Parse CSV data
csv_reader = csv.DictReader(io.StringIO(products_data))
@ -223,36 +225,46 @@ class DataInitializationService(BaseService):
batch_size = 1000 # Process in batches of 1000
total_products = len(products_list)
with transaction(db):
with db_transaction(db):
for i in range(0, total_products, batch_size):
batch = products_list[i:i + batch_size]
for product_data in batch:
existing_product = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.product_id == product_data["productId"]).first()
sub_type_name = product_data.get("subTypeName") if product_data.get("subTypeName") else "other"
existing_product = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.tcgplayer_product_id == product_data["productId"]).filter(TCGPlayerProduct.sub_type_name == sub_type_name).first()
if existing_product:
# Update existing product
for key, value in {
"name": product_data["name"],
"clean_name": product_data.get("cleanName"),
"image_url": product_data.get("imageUrl"),
"sub_type_name": product_data.get("subTypeName") if product_data.get("subTypeName") else "other",
"normalized_sub_type_name": product_data.get("subTypeName").lower().replace(" ", "_") if product_data.get("subTypeName") else "other",
"category_id": product_data.get("categoryId"),
"group_id": product_data.get("groupId"),
"url": product_data.get("url"),
"modified_on": datetime.fromisoformat(product_data["modifiedOn"].replace("Z", "+00:00")) if product_data.get("modifiedOn") else None,
"image_count": product_data.get("imageCount", 0),
"ext_rarity": product_data.get("extRarity"),
"ext_subtype": product_data.get("extSubtype"),
"ext_oracle_text": product_data.get("extOracleText"),
"ext_number": product_data.get("extNumber"),
"low_price": float(product_data.get("lowPrice")) if product_data.get("lowPrice") else None,
"mid_price": float(product_data.get("midPrice")) if product_data.get("midPrice") else None,
"high_price": float(product_data.get("highPrice")) if product_data.get("highPrice") else None,
"market_price": float(product_data.get("marketPrice")) if product_data.get("marketPrice") else None,
"direct_low_price": float(product_data.get("directLowPrice")) if product_data.get("directLowPrice") else None,
"sub_type_name": product_data.get("subTypeName")
"ext_flavor_text": product_data.get("extFlavorText"),
"ext_power": product_data.get("extPower"),
"ext_toughness": product_data.get("extToughness"),
"ext_flavor_text": product_data.get("extFlavorText")
}.items():
setattr(existing_product, key, value)
else:
logger.debug(f"Creating new product: {product_data['productId']} product name: {product_data['name']}")
new_product = TCGPlayerProduct(
product_id=product_data["productId"],
tcgplayer_product_id=product_data["productId"],
name=product_data["name"],
normalized_sub_type_name=product_data.get("subTypeName").lower().replace(" ", "_") if product_data.get("subTypeName") else "other",
clean_name=product_data.get("cleanName"),
image_url=product_data.get("imageUrl"),
category_id=product_data.get("categoryId"),
@ -269,7 +281,7 @@ class DataInitializationService(BaseService):
high_price=float(product_data.get("highPrice")) if product_data.get("highPrice") else None,
market_price=float(product_data.get("marketPrice")) if product_data.get("marketPrice") else None,
direct_low_price=float(product_data.get("directLowPrice")) if product_data.get("directLowPrice") else None,
sub_type_name=product_data.get("subTypeName"),
sub_type_name=product_data.get("subTypeName") if product_data.get("subTypeName") else "other",
ext_power=product_data.get("extPower"),
ext_toughness=product_data.get("extToughness"),
ext_flavor_text=product_data.get("extFlavorText")
@ -319,50 +331,81 @@ class DataInitializationService(BaseService):
async def sync_archived_prices(self, db: Session, archived_prices_data: dict, date: datetime):
"""Sync archived prices data to the database using bulk operations.
Note: Historical prices are never updated, only new records are inserted."""
from sqlalchemy import insert
from app.models.tcgplayer_price_history import TCGPlayerPriceHistory
# Prepare data for bulk operations
price_records = []
for price_data in archived_prices_data.get("results", []):
record = {
"product_id": price_data["productId"],
"date": date,
"sub_type_name": price_data["subTypeName"],
"low_price": price_data.get("lowPrice"),
"mid_price": price_data.get("midPrice"),
"high_price": price_data.get("highPrice"),
"market_price": price_data.get("marketPrice"),
"direct_low_price": price_data.get("directLowPrice")
}
price_records.append(record)
if not price_records:
if not archived_prices_data.get("success"):
logger.error("Price data sync failed - success flag is false")
return
# Get existing records in bulk to avoid duplicates
product_ids = [r["product_id"] for r in price_records]
sub_type_names = [r["sub_type_name"] for r in price_records]
# Get existing records in bulk to avoid duplicates using a composite key
existing_records = db.query(TCGPlayerPriceHistory).filter(
TCGPlayerPriceHistory.product_id.in_(product_ids),
TCGPlayerPriceHistory.date == date,
TCGPlayerPriceHistory.sub_type_name.in_(sub_type_names)
TCGPlayerPriceHistory.date == date
).all()
# Filter out existing records
existing_keys = {(r.product_id, r.date, r.sub_type_name) for r in existing_records}
to_insert = [
record for record in price_records
if (record["product_id"], record["date"], record["sub_type_name"]) not in existing_keys
]
# Perform bulk insert for new records only
if to_insert:
stmt = insert(TCGPlayerPriceHistory)
db.execute(stmt, to_insert)
db.commit()
# Prepare batch insert data
price_history_batch = []
# Process price data in batches
for price_data in archived_prices_data.get("results", []):
try:
# Get the subtype name from the price data
sub_type_name = price_data.get("subTypeName", "other")
# First try to find product with the requested subtype
product = db.query(TCGPlayerProduct).filter(
TCGPlayerProduct.tcgplayer_product_id == price_data["productId"],
TCGPlayerProduct.sub_type_name == sub_type_name
).first()
# If not found and subtype isn't "other", try with "other" subtype
if not product and sub_type_name != "other":
product = db.query(TCGPlayerProduct).filter(
TCGPlayerProduct.tcgplayer_product_id == price_data["productId"],
TCGPlayerProduct.sub_type_name == "other"
).first()
if product:
sub_type_name = "other"
#logger.info(f"Found product {price_data['productId']} with 'other' subtype as fallback for {sub_type_name}")
if not product:
logger.warning(f"No product found for {price_data['productId']} with subtype {sub_type_name} or 'other'")
continue
# Skip if record already exists
if (product.tcgplayer_product_id, date, sub_type_name) in existing_keys:
continue
# Validate and convert price data
try:
price_history = TCGPlayerPriceHistory(
product_id=product.tcgplayer_product_id,
sub_type_name=sub_type_name,
date=date,
low_price=float(price_data.get("lowPrice")) if price_data.get("lowPrice") else None,
mid_price=float(price_data.get("midPrice")) if price_data.get("midPrice") else None,
high_price=float(price_data.get("highPrice")) if price_data.get("highPrice") else None,
market_price=float(price_data.get("marketPrice")) if price_data.get("marketPrice") else None,
direct_low_price=float(price_data.get("directLowPrice")) if price_data.get("directLowPrice") else None
)
price_history_batch.append(price_history)
except (ValueError, TypeError) as e:
logger.error(f"Invalid price data for product {price_data['productId']}: {str(e)}")
continue
# Process in batches of 1000
if len(price_history_batch) >= 1000:
with db_transaction(db):
db.bulk_save_objects(price_history_batch)
price_history_batch = []
except Exception as e:
logger.error(f"Error processing price data for product {price_data['productId']}: {str(e)}")
continue
# Process any remaining records
if price_history_batch:
with db_transaction(db):
db.bulk_save_objects(price_history_batch)
async def init_archived_prices(self, db: Session, start_date: datetime, end_date: datetime, use_cache: bool = True, game_ids: List[int] = None) -> bool:
"""Initialize archived prices data"""
@ -470,7 +513,7 @@ class DataInitializationService(BaseService):
# Get SKUs data
skus_data = await mtgjson_service.get_skus(db, use_cache)
if skus_data and "data" in skus_data:
skus_count = await self.sync_mtgjson_skus(db, list(skus_data["data"].values()))
skus_count = await self.sync_mtgjson_skus(db, skus_data)
return {
"identifiers_processed": identifiers_count,
@ -479,27 +522,20 @@ class DataInitializationService(BaseService):
async def sync_mtgjson_identifiers(self, db: Session, identifiers_data: List[dict]) -> int:
"""Sync MTGJSON identifiers data to the database"""
from app.models.mtgjson_card import MTGJSONCard
count = 0
with transaction(db):
with db_transaction(db):
for card_data in identifiers_data:
if not isinstance(card_data, dict):
logger.debug(f"Skipping non-dict item: {card_data}")
continue
card_id = card_data.get("uuid")
if not card_id:
logger.debug(f"Skipping item without UUID: {card_data}")
continue
existing_card = db.query(MTGJSONCard).filter(MTGJSONCard.card_id == card_id).first()
existing_card = db.query(MTGJSONCard).filter(MTGJSONCard.mtgjson_uuid == card_data.get("uuid")).first()
if existing_card:
# Update existing card
for key, value in {
"name": card_data.get("name"),
"set_code": card_data.get("setCode"),
"uuid": card_data.get("uuid"),
"abu_id": card_data.get("identifiers", {}).get("abuId"),
"card_kingdom_etched_id": card_data.get("identifiers", {}).get("cardKingdomEtchedId"),
"card_kingdom_foil_id": card_data.get("identifiers", {}).get("cardKingdomFoilId"),
@ -530,10 +566,9 @@ class DataInitializationService(BaseService):
setattr(existing_card, key, value)
else:
new_card = MTGJSONCard(
card_id=card_id,
mtgjson_uuid=card_data.get("uuid"),
name=card_data.get("name"),
set_code=card_data.get("setCode"),
uuid=card_data.get("uuid"),
abu_id=card_data.get("identifiers", {}).get("abuId"),
card_kingdom_etched_id=card_data.get("identifiers", {}).get("cardKingdomEtchedId"),
card_kingdom_foil_id=card_data.get("identifiers", {}).get("cardKingdomFoilId"),
@ -566,38 +601,35 @@ class DataInitializationService(BaseService):
return count
async def sync_mtgjson_skus(self, db: Session, skus_data: List[List[dict]]) -> int:
async def sync_mtgjson_skus(self, db: Session, skus_data: dict) -> int:
"""Sync MTGJSON SKUs data to the database"""
from app.models.mtgjson_sku import MTGJSONSKU
count = 0
with transaction(db):
for product_data in skus_data:
for sku_data in product_data:
sku_id = sku_data.get("skuId")
if not sku_id:
logger.debug(f"Skipping item without SKU ID: {sku_data}")
continue
existing_sku = db.query(MTGJSONSKU).filter(MTGJSONSKU.sku_id == str(sku_id)).first()
if existing_sku:
with db_transaction(db):
for mtgjson_uuid, product_data in skus_data['data'].items():
for sku_data in product_data:
existing_record = db.query(MTGJSONSKU).filter(MTGJSONSKU.mtgjson_uuid == mtgjson_uuid).filter(MTGJSONSKU.tcgplayer_sku_id == sku_data.get("skuId")).first()
if existing_record:
# Update existing SKU
for key, value in {
"product_id": sku_data.get("productId"),
"tcgplayer_product_id": sku_data.get("productId"),
"condition": sku_data.get("condition"),
"finish": sku_data.get("finish"),
"language": sku_data.get("language"),
"printing": sku_data.get("printing"),
"normalized_printing": sku_data.get("printing").lower().replace(" ", "_") if sku_data.get("printing") else None
}.items():
setattr(existing_sku, key, value)
setattr(existing_record, key, value)
else:
new_sku = MTGJSONSKU(
sku_id=sku_id,
product_id=sku_data.get("productId"),
mtgjson_uuid=mtgjson_uuid,
tcgplayer_sku_id=sku_data.get("skuId"),
tcgplayer_product_id=sku_data.get("productId"),
condition=sku_data.get("condition"),
finish=sku_data.get("finish"),
language=sku_data.get("language"),
printing=sku_data.get("printing"),
normalized_printing=sku_data.get("printing").lower().replace(" ", "_") if sku_data.get("printing") else None
)
db.add(new_sku)
count += 1
@ -654,4 +686,62 @@ class DataInitializationService(BaseService):
if file.path.startswith(subdir):
await self.file_service.delete_file(db, file.id)
await self.mtgjson_service.clear_cache()
print("Cache cleared")
print("Cache cleared")
async def initialize_inventory_data(self, db: Session) -> None:
"""Initialize inventory data"""
with db_transaction(db):
logger.info("Initializing inventory data...")
# set expected value
product_id1 = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.tcgplayer_sku == "562118").first().id
expected_value_box = SealedExpectedValue(
product_id=product_id1,
expected_value=120.69
)
db.add(expected_value_box)
db.flush()
product_id2 = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.tcgplayer_sku == "562119").first().id
expected_value_case = SealedExpectedValue(
product_id=product_id2,
expected_value=820.69
)
db.add(expected_value_case)
db.flush()
inventory_service = self.get_service("inventory")
customer = await inventory_service.create_customer(db, "Bob Smith")
vendor = await inventory_service.create_vendor(db, "Joe Blow")
marketplace = await inventory_service.create_marketplace(db, "Tcgplayer")
transaction = await inventory_service.create_purchase_transaction(db, PurchaseTransactionCreate(
vendor_id=vendor.id,
transaction_date=datetime.now(),
items=[PurchaseItem(product_id=product_id1, unit_price=100.69, quantity=1, is_case=False),
PurchaseItem(product_id=product_id2, unit_price=800.01, quantity=2, is_case=True, num_boxes=6)],
transaction_notes="Test Transaction: 1 case and 2 boxes of foundations"
))
logger.info(f"Transaction created: {transaction}")
case_num = 0
for item in transaction.transaction_items:
item = InventoryItemContextFactory(db).get_context(item.physical_item.inventory_item)
logger.info(f"Item: {item}")
if item.physical_item.item_type == "sealed_box":
manabox_service = self.get_service("manabox")
file_path = 'app/data/test_data/manabox_test_file.csv'
file_bytes = open(file_path, 'rb').read()
manabox_file = await manabox_service.process_manabox_csv(db, file_bytes, {"source": "test", "description": "test"}, wait=True)
# Ensure manabox_file is a list before passing it
if not isinstance(manabox_file, list):
manabox_file = [manabox_file]
sealed_box_service = self.get_service("sealed_box")
sealed_box = sealed_box_service.get(db, item.physical_item.inventory_item.id)
success = await inventory_service.process_manabox_import_staging(db, manabox_file, sealed_box)
logger.info(f"sealed box opening success: {success}")
elif item.physical_item.item_type == "sealed_case":
if case_num == 0:
logger.info(f"sealed case {case_num} opening...")
sealed_case_service = self.get_service("sealed_case")
success = await sealed_case_service.open_sealed_case(db, item.physical_item)
logger.info(f"sealed case {case_num} opening success: {success}")
case_num += 1
logger.info("Inventory data initialized")