from typing import List, Dict import json import pandas as pd from datetime import datetime from pathlib import Path from jinja2 import Environment, FileSystemLoader from weasyprint import HTML import logging import asyncio from app.schemas.file import FileInDB from app.services.base_service import BaseService from sqlalchemy.orm import Session from app.models.tcgplayer_products import TCGPlayerProduct, TCGPlayerGroup, MTGJSONSKU, MTGJSONCard logger = logging.getLogger(__name__) class PullSheetService(BaseService): def __init__(self): super().__init__(None) self.template_dir = Path("app/data/assets/templates") self.env = Environment(loader=FileSystemLoader(str(self.template_dir))) self.template = self.env.get_template("pull_sheet.html") async def get_or_create_rendered_pull_sheet(self, db: Session, order_ids: list[str]) -> FileInDB: # get file service file_service = self.get_service('file') # check if rendered pull sheet exists rendered_pull_sheet = await file_service.get_file_by_metadata(db, "order_ids", order_ids, "rendered_pull_sheet", "application/pdf") if rendered_pull_sheet: return rendered_pull_sheet # check if pull sheet data file exists pull_sheet_data_file = await file_service.get_file_by_metadata(db, "order_ids", order_ids, "pull_sheet", "text/csv") if pull_sheet_data_file: # generate pdf from pull sheet data file return await self.generate_pull_sheet_pdf(db, pull_sheet_data_file) # if no pull sheet data file exists, get it from order management service order_service = self.get_service('order_management') pull_sheet_data_file = await order_service.get_pull_sheet(db, order_ids) return await self.generate_pull_sheet_pdf(db, pull_sheet_data_file) async def generate_pull_sheet_pdf(self, db: Session, file: FileInDB) -> FileInDB: """Generate a PDF pull sheet from a CSV file. Args: file: FileInDB object containing the pull sheet data Returns: Path to the generated PDF file """ try: # Read and process CSV data items = await self._read_and_process_csv(db, file.path) # Prepare template data template_data = { 'items': items, 'generation_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # Render HTML html_content = self.template.render(**template_data) # Ensure metadata is properly formatted metadata = file.file_metadata.copy() if file.file_metadata else {} if 'order_ids' in metadata: metadata['order_ids'] = sorted(metadata['order_ids']) file_service = self.get_service('file') return await file_service.save_file( db=db, file_data=html_content, filename=f"rendered_pull_sheet_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf", subdir="tcgplayer/pull_sheets/rendered", file_type="rendered_pull_sheet", content_type="application/pdf", metadata=metadata, html_content=True # This tells FileService to convert HTML to PDF ) except Exception as e: logger.error(f"Error generating pull sheet PDF: {str(e)}") raise async def _get_color_identity(self, db: Session, row: pd.Series) -> str: """Get color identity from a row. Args: row: pandas Series """ # get category id from set name group_id = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.name == row['Set']).first().group_id # format number number = str(int(row['Number'])) if 'Number' in row and pd.notna(row['Number']) and '/' not in str(row['Number']) else str(row['Number']) if 'Number' in row and pd.notna(row['Number']) and '/' in str(row['Number']) else '' # get product info from category id product_id = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.group_id == group_id).filter(TCGPlayerProduct.name == row['Product Name']).filter(TCGPlayerProduct.ext_number == number).filter(TCGPlayerProduct.ext_rarity == row['Rarity']).first().tcgplayer_product_id # get scryfall id from product id mtgjson_id = db.query(MTGJSONSKU).filter(MTGJSONSKU.tcgplayer_product_id == product_id).first().mtgjson_uuid scryfall_id = db.query(MTGJSONCard).filter(MTGJSONCard.mtgjson_uuid == mtgjson_id).first().scryfall_id # get color identity from scryfall scryfall_service = self.get_service('scryfall') color_identity = await scryfall_service.get_color_identity(scryfall_id) if color_identity is None: return '?' # color identity is str of json array, convert to human readable string of list color_identity = [str(color) for color in color_identity] # if color identity is empty, return C for colorless if not color_identity: return 'C' # ensure order, W > U > B > R > G color_identity = sorted(color_identity, key=lambda x: ['W', 'U', 'B', 'R', 'G'].index(x)) color_identity = ''.join(color_identity) return color_identity async def _update_row_color_identity(self, db: Session, row: pd.Series) -> pd.Series: """Update color identity from a row. Args: row: pandas Series """ # get color identity from row color_identity = await self._get_color_identity(db, row) # update row with color identity row['Color Identity'] = color_identity return row async def _read_and_process_csv(self, db: Session, csv_path: str) -> List[Dict]: """Read and process CSV data using pandas. Args: csv_path: Path to the CSV file Returns: List of processed items """ # Read CSV into pandas DataFrame in a separate thread to avoid blocking df = await asyncio.get_event_loop().run_in_executor( None, lambda: pd.read_csv(csv_path) ) # Filter out the "Orders Contained in Pull Sheet" row df = df[df['Product Line'] != 'Orders Contained in Pull Sheet:'].copy() # Convert Set Release Date to datetime df['Set Release Date'] = pd.to_datetime(df['Set Release Date'], format='%m/%d/%Y %H:%M:%S') # Sort by Set Release Date (descending) and then Product Name (ascending) df = df.sort_values(['Set Release Date', 'Set', 'Product Name'], ascending=[False, True, True]) # Process color identities for all rows color_identities = [] for _, row in df.iterrows(): color_identity = await self._get_color_identity(db, row) color_identities.append(color_identity) # Add color identity column to dataframe df['Color Identity'] = color_identities # Convert to list of dictionaries items = [] for _, row in df.iterrows(): items.append({ 'product_name': row['Product Name'], 'condition': row['Condition'], 'quantity': str(int(row['Quantity'])), # Convert to string for template 'set': row['Set'], 'rarity': row['Rarity'], 'card_number': str(int(row['Number'])) if 'Number' in row and pd.notna(row['Number']) and '/' not in str(row['Number']) else str(row['Number']) if 'Number' in row and pd.notna(row['Number']) and '/' in str(row['Number']) else '', 'color_identity': row['Color Identity'] }) return items