ai_giga_tcg/app/services/pull_sheet_service.py

173 lines
7.8 KiB
Python

from typing import List, Dict
import json
import pandas as pd
from datetime import datetime
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
import logging
import asyncio
from app.schemas.file import FileInDB
from app.services.base_service import BaseService
from sqlalchemy.orm import Session
from app.models.tcgplayer_products import TCGPlayerProduct, TCGPlayerGroup, MTGJSONSKU, MTGJSONCard
logger = logging.getLogger(__name__)
class PullSheetService(BaseService):
def __init__(self):
super().__init__(None)
self.template_dir = Path("app/data/assets/templates")
self.env = Environment(loader=FileSystemLoader(str(self.template_dir)))
self.template = self.env.get_template("pull_sheet.html")
async def get_or_create_rendered_pull_sheet(self, db: Session, order_ids: list[str]) -> FileInDB:
# get file service
file_service = self.get_service('file')
# check if rendered pull sheet exists
rendered_pull_sheet = await file_service.get_file_by_metadata(db, "order_ids", order_ids, "rendered_pull_sheet", "application/pdf")
if rendered_pull_sheet:
return rendered_pull_sheet
# check if pull sheet data file exists
pull_sheet_data_file = await file_service.get_file_by_metadata(db, "order_ids", order_ids, "pull_sheet", "text/csv")
if pull_sheet_data_file:
# generate pdf from pull sheet data file
return await self.generate_pull_sheet_pdf(db, pull_sheet_data_file)
# if no pull sheet data file exists, get it from order management service
order_service = self.get_service('order_management')
pull_sheet_data_file = await order_service.get_pull_sheet(db, order_ids)
return await self.generate_pull_sheet_pdf(db, pull_sheet_data_file)
async def generate_pull_sheet_pdf(self, db: Session, file: FileInDB) -> FileInDB:
"""Generate a PDF pull sheet from a CSV file.
Args:
file: FileInDB object containing the pull sheet data
Returns:
Path to the generated PDF file
"""
try:
# Read and process CSV data
items = await self._read_and_process_csv(db, file.path)
# Prepare template data
template_data = {
'items': items,
'generation_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# Render HTML
html_content = self.template.render(**template_data)
# Ensure metadata is properly formatted
metadata = file.file_metadata.copy() if file.file_metadata else {}
if 'order_ids' in metadata:
metadata['order_ids'] = sorted(metadata['order_ids'])
file_service = self.get_service('file')
return await file_service.save_file(
db=db,
file_data=html_content,
filename=f"rendered_pull_sheet_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf",
subdir="tcgplayer/pull_sheets/rendered",
file_type="rendered_pull_sheet",
content_type="application/pdf",
metadata=metadata,
html_content=True # This tells FileService to convert HTML to PDF
)
except Exception as e:
logger.error(f"Error generating pull sheet PDF: {str(e)}")
raise
async def _get_color_identity(self, db: Session, row: pd.Series) -> str:
"""Get color identity from a row.
Args:
row: pandas Series
"""
# get category id from set name
group_id = db.query(TCGPlayerGroup).filter(TCGPlayerGroup.name == row['Set']).first().group_id
# format number
number = str(int(row['Number'])) if 'Number' in row and pd.notna(row['Number']) and '/' not in str(row['Number']) else str(row['Number']) if 'Number' in row and pd.notna(row['Number']) and '/' in str(row['Number']) else ''
# get product info from category id
product_id = db.query(TCGPlayerProduct).filter(TCGPlayerProduct.group_id == group_id).filter(TCGPlayerProduct.name == row['Product Name']).filter(TCGPlayerProduct.ext_number == number).filter(TCGPlayerProduct.ext_rarity == row['Rarity']).first().tcgplayer_product_id
# get scryfall id from product id
mtgjson_id = db.query(MTGJSONSKU).filter(MTGJSONSKU.tcgplayer_product_id == product_id).first().mtgjson_uuid
scryfall_id = db.query(MTGJSONCard).filter(MTGJSONCard.mtgjson_uuid == mtgjson_id).first().scryfall_id
# get color identity from scryfall
scryfall_service = self.get_service('scryfall')
color_identity = await scryfall_service.get_color_identity(scryfall_id)
if color_identity is None:
return '?'
# color identity is str of json array, convert to human readable string of list
color_identity = [str(color) for color in color_identity]
# if color identity is empty, return C for colorless
if not color_identity:
return 'C'
# ensure order, W > U > B > R > G
color_identity = sorted(color_identity, key=lambda x: ['W', 'U', 'B', 'R', 'G'].index(x))
color_identity = ''.join(color_identity)
return color_identity
async def _update_row_color_identity(self, db: Session, row: pd.Series) -> pd.Series:
"""Update color identity from a row.
Args:
row: pandas Series
"""
# get color identity from row
color_identity = await self._get_color_identity(db, row)
# update row with color identity
row['Color Identity'] = color_identity
return row
async def _read_and_process_csv(self, db: Session, csv_path: str) -> List[Dict]:
"""Read and process CSV data using pandas.
Args:
csv_path: Path to the CSV file
Returns:
List of processed items
"""
# Read CSV into pandas DataFrame in a separate thread to avoid blocking
df = await asyncio.get_event_loop().run_in_executor(
None,
lambda: pd.read_csv(csv_path)
)
# Filter out the "Orders Contained in Pull Sheet" row
df = df[df['Product Line'] != 'Orders Contained in Pull Sheet:'].copy()
# Convert Set Release Date to datetime
df['Set Release Date'] = pd.to_datetime(df['Set Release Date'], format='%m/%d/%Y %H:%M:%S')
# Sort by Set Release Date (descending) and then Product Name (ascending)
df = df.sort_values(['Set Release Date', 'Set', 'Product Name'], ascending=[False, True, True])
# Process color identities for all rows
color_identities = []
for _, row in df.iterrows():
color_identity = await self._get_color_identity(db, row)
color_identities.append(color_identity)
# Add color identity column to dataframe
df['Color Identity'] = color_identities
# Convert to list of dictionaries
items = []
for _, row in df.iterrows():
items.append({
'product_name': row['Product Name'],
'condition': row['Condition'],
'quantity': str(int(row['Quantity'])), # Convert to string for template
'set': row['Set'],
'rarity': row['Rarity'],
'card_number': str(int(row['Number'])) if 'Number' in row and pd.notna(row['Number']) and '/' not in str(row['Number']) else str(row['Number']) if 'Number' in row and pd.notna(row['Number']) and '/' in str(row['Number']) else '',
'color_identity': row['Color Identity']
})
return items