we are so back
This commit is contained in:
@ -9,7 +9,7 @@ from typing import Dict, Any, Union, List
|
||||
import csv
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
from fastapi import BackgroundTasks
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -17,7 +17,7 @@ class ManaboxService(BaseService):
|
||||
def __init__(self):
|
||||
super().__init__(None)
|
||||
|
||||
async def process_manabox_csv(self, db: Session, bytes: bytes, metadata: Dict[str, Any], wait: bool = False) -> Union[bool, List[FileInDB]]:
|
||||
async def process_manabox_csv(self, db: Session, bytes: bytes, metadata: Dict[str, Any], background_tasks: BackgroundTasks, wait: bool = False) -> Union[bool, List[FileInDB]]:
|
||||
# save file
|
||||
file = await self.file_service.save_file(
|
||||
db=db,
|
||||
@ -29,34 +29,36 @@ class ManaboxService(BaseService):
|
||||
metadata=metadata
|
||||
)
|
||||
|
||||
# Create the background task
|
||||
task = asyncio.create_task(self._process_file_background(db, file))
|
||||
|
||||
# If wait is True, wait for the task to complete and return the file
|
||||
if wait:
|
||||
await task
|
||||
await self._process_file_background(db, file)
|
||||
return_value = await self.file_service.get_file(db, file.id)
|
||||
return [return_value] if return_value else []
|
||||
|
||||
return True
|
||||
else:
|
||||
background_tasks.add_task(self._process_file_background, db, file)
|
||||
return True
|
||||
|
||||
async def _process_file_background(self, db: Session, file: FileInDB):
|
||||
try:
|
||||
# Read the CSV file
|
||||
with open(file.path, 'r') as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
logger.debug(f"Processing file: {file.path}")
|
||||
# Pre-fetch all MTGJSONCards for Scryfall IDs in the file
|
||||
scryfall_ids = {row['Scryfall ID'] for row in reader}
|
||||
mtg_json_map = {card.scryfall_id: card for card in db.query(MTGJSONCard).filter(MTGJSONCard.scryfall_id.in_(scryfall_ids)).all()}
|
||||
logger.debug(f"len ids: {len(scryfall_ids)}")
|
||||
|
||||
# Re-read the file to process the rows
|
||||
csv_file.seek(0)
|
||||
logger.debug(f"header: {reader.fieldnames}")
|
||||
next(reader) # Skip the header row
|
||||
|
||||
staging_entries = [] # To collect all staging entries for batch insert
|
||||
critical_errors = [] # To collect errors for logging
|
||||
|
||||
for row in reader:
|
||||
|
||||
logger.debug(f"Processing row: {row}")
|
||||
mtg_json = mtg_json_map.get(row['Scryfall ID'])
|
||||
|
||||
if not mtg_json:
|
||||
@ -109,6 +111,7 @@ class ManaboxService(BaseService):
|
||||
|
||||
# Prepare the staging entry
|
||||
quantity = int(row['Quantity'])
|
||||
logger.debug(f"inserting row file id: {file.id} tcgplayer_product_id: {tcgplayer_product.tcgplayer_product_id} tcgplayer_sku_id: {tcgplayer_sku.tcgplayer_sku_id} quantity: {quantity}")
|
||||
staging_entries.append(ManaboxImportStaging(
|
||||
file_id=file.id,
|
||||
tcgplayer_product_id=tcgplayer_product.tcgplayer_product_id,
|
||||
@ -118,10 +121,13 @@ class ManaboxService(BaseService):
|
||||
|
||||
# Bulk insert all valid ManaboxImportStaging entries
|
||||
if staging_entries:
|
||||
db.bulk_save_objects(staging_entries)
|
||||
logger.debug(f"inserting {len(staging_entries)} rows")
|
||||
with transaction(db):
|
||||
db.bulk_save_objects(staging_entries)
|
||||
|
||||
# Log any critical errors that occurred
|
||||
for error_message in critical_errors:
|
||||
logger.debug(f"logging critical error: {error_message}")
|
||||
with transaction(db):
|
||||
critical_error_log = CriticalErrorLog(error_message=error_message)
|
||||
db.add(critical_error_log)
|
||||
|
Reference in New Issue
Block a user