giga_tcg/app/db/database.py
2025-02-07 20:54:55 -05:00

116 lines
3.8 KiB
Python

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
from typing import Generator
import os
from sqlalchemy import inspect
from app.services.tcgplayer import TCGPlayerService
from app.services.pricing import PricingService
from app.services.file import FileService
from app.db.models import Price
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
# Get database URL from environment variable with fallback
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///omegacard.db")
# Create engine with proper configuration
engine = create_engine(
DATABASE_URL,
pool_pre_ping=True, # Enable connection health checks
pool_size=5, # Set reasonable pool size
max_overflow=10 # Allow some overflow connections
)
# Create session factory
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False
)
@contextmanager
def get_db_session() -> Generator[Session, None, None]:
"""Context manager for database sessions"""
session = SessionLocal()
try:
yield session
except Exception as e:
logger.error(f"Database session error: {str(e)}")
session.rollback()
raise
finally:
session.close()
def get_db() -> Generator[Session, None, None]:
"""Dependency for FastAPI to get database sessions"""
with get_db_session() as session:
yield session
def prepopulate_data(db: Session, db_exist: bool = False) -> None:
file_service = FileService(db)
tcgplayer_service = TCGPlayerService(db, file_service)
pricing_service = PricingService(db, file_service, tcgplayer_service)
if not db_exist:
tcgplayer_service.populate_tcgplayer_groups()
file = tcgplayer_service.load_tcgplayer_cards()
pricing_service.cron_load_prices(file)
else:
pricing_service.cron_load_prices()
def init_db() -> None:
"""Initialize database tables and run first-time setup if needed"""
from .models import Base
try:
inspector = inspect(engine)
tables_exist = all(
table in inspector.get_table_names()
for table in Base.metadata.tables.keys()
)
if tables_exist:
with get_db_session() as db:
# get date created of latest pricing record
latest_price = db.query(Price).order_by(Price.date_created.desc()).first()
if latest_price:
# check if it is greater than 1.5 hours old
if (datetime.now() - latest_price.date_created).total_seconds() > 5400:
prepopulate_data(db, db_exist=True)
else:
prepopulate_data(db, db_exist=True)
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
# Run first-time setup only if tables were just created
if not tables_exist:
with get_db_session() as db:
prepopulate_data(db)
logger.info("First-time database setup completed")
logger.info("Database initialization completed")
except Exception as e:
logger.error(f"Failed to initialize database: {str(e)}")
raise
def check_db_connection() -> bool:
"""Check if database connection is working"""
try:
with get_db_session() as session:
session.execute(text("SELECT 1"))
return True
except Exception as e:
logger.error(f"Database connection check failed: {str(e)}")
return False
def destroy_db() -> None:
"""Destroy all database tables"""
from .models import Base
try:
Base.metadata.drop_all(bind=engine)
logger.info("Database tables dropped successfully")
except Exception as e:
logger.error(f"Failed to destroy database: {str(e)}")
raise