ai_giga_tcg/app/services/file_service.py
2025-04-18 15:19:57 -04:00

160 lines
5.7 KiB
Python

from typing import Optional, Union, List, Dict
import os
from pathlib import Path
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy import String
from app.models.file import File
from app.schemas.file import FileBase, FileCreate, FileInDB
from app.db.database import transaction
import logging
import asyncio
from weasyprint import HTML
import json
logger = logging.getLogger(__name__)
class FileService:
def __init__(self, base_cache_dir: str = "app/data/cache"):
self.base_cache_dir = Path(base_cache_dir)
self.base_cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_path(self, subdir: str, filename: str) -> Path:
"""Get the full cache path for a file"""
cache_dir = self.base_cache_dir / subdir
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / filename
async def save_file(
self,
db: Session,
file_data: Union[bytes, str],
filename: str,
subdir: str,
file_type: Optional[str] = None,
content_type: Optional[str] = None,
metadata: Optional[Dict] = None,
html_content: Optional[bool] = False
) -> FileInDB:
"""Save a file to the cache directory and create a database record"""
try:
# Get the full cache path
cache_path = self._get_cache_path(subdir, filename)
if html_content and isinstance(file_data, str):
await asyncio.get_event_loop().run_in_executor(
None,
lambda: HTML(string=file_data).write_pdf(str(cache_path))
)
else:
# Write the file data
if isinstance(file_data, str):
mode = 'w'
encoding = 'utf-8'
else:
mode = 'wb'
encoding = None
with open(cache_path, mode, encoding=encoding) as f:
f.write(file_data)
# Create database record
file_record = File(
name=filename,
path=str(cache_path),
file_type=file_type,
content_type=content_type,
size=os.path.getsize(cache_path),
file_metadata=metadata
)
db.add(file_record)
db.commit()
db.refresh(file_record)
return FileInDB.model_validate(file_record)
except Exception as e:
logger.error(f"Error saving file {filename}: {str(e)}")
raise
async def get_file(self, db: Session, file_id: int) -> Optional[FileInDB]:
"""Get a file record from the database"""
file_record = db.query(File).filter(File.id == file_id).first()
if file_record:
return FileInDB.model_validate(file_record)
return None
async def get_file_path(self, db: Session, file_id: int) -> Optional[Path]:
"""Get the path to a file from its ID"""
file_record = await self.get_file(db, file_id)
if file_record and os.path.exists(file_record.path):
return Path(file_record.path)
return None
async def delete_file(self, db: Session, file_id: int) -> bool:
"""Delete a file and its database record"""
try:
file_record = db.query(File).filter(File.id == file_id).first()
if file_record:
# Delete the file if it exists
if os.path.exists(file_record.path):
os.remove(file_record.path)
# Delete the database record
db.delete(file_record)
db.commit()
return True
return False
except Exception as e:
logger.error(f"Error deleting file {file_id}: {str(e)}")
raise
async def list_files(
self,
db: Session,
skip: int = 0,
limit: int = 100,
file_type: Optional[str] = None
) -> List[FileInDB]:
"""List files with optional filtering"""
query = db.query(File)
if file_type:
query = query.filter(File.type == file_type)
files = query.offset(skip).limit(limit).all()
return [FileInDB.model_validate(file) for file in files]
async def get_file_by_metadata(
self,
db: Session,
metadata_key: str,
metadata_value: Union[str, List[str]],
file_type: Optional[str] = None,
content_type: Optional[str] = None) -> Optional[FileInDB] | None:
# Handle array comparison for order_ids
if metadata_key == "order_ids" and isinstance(metadata_value, list):
# Sort and convert to JSON string for consistent comparison
sorted_value = sorted(metadata_value)
query = db.query(File).filter(
File.file_metadata[metadata_key].cast(String) == json.dumps(sorted_value)
)
else:
query = db.query(File).filter(File.file_metadata[metadata_key].cast(String) == str(metadata_value))
if file_type:
query = query.filter(File.file_type == file_type)
if content_type:
query = query.filter(File.content_type == content_type)
file_record = query.first()
if file_record:
return FileInDB.model_validate(file_record)
else:
return None
async def get_file_by_filename(self, db: Session, filename: str) -> Optional[FileInDB]:
"""Get a file record from the database by filename"""
file_record = db.query(File).filter(File.name == filename).first()
if file_record:
return FileInDB.model_validate(file_record)
return None