ai_giga_tcg/app/services/inventory_service.py
2025-05-30 17:31:59 -04:00

506 lines
22 KiB
Python

from typing import List, Optional, Dict, TypedDict
from sqlalchemy.orm import Session, joinedload
from decimal import Decimal
from app.services.base_service import BaseService
from app.models.manabox_import_staging import ManaboxImportStaging
from app.contexts.inventory_item import InventoryItemContextFactory
from app.models.inventory_management import (
OpenEvent, Card, InventoryItem, Case, SealedExpectedValue,
Transaction, TransactionItem, Customer, Vendor, Marketplace, Box, MarketplaceListing
)
from app.schemas.file import FileInDB
from app.models.inventory_management import PhysicalItem
from app.schemas.transaction import PurchaseTransactionCreate, SaleTransactionCreate, TransactionResponse, SealedExpectedValueCreate
from app.db.database import transaction as db_transaction
from datetime import datetime
from typing import Any
import logging
logger = logging.getLogger(__name__)
class InventoryService(BaseService):
def __init__(self):
super().__init__(None)
async def get_resulting_items_for_open_event(self, db: Session, open_event: OpenEvent) -> List[InventoryItem]:
# Get the IDs of resulting items
resulting_item_ids = [item.id for item in open_event.resulting_items]
# Query using the IDs
return db.query(InventoryItem).filter(InventoryItem.physical_item_id.in_(resulting_item_ids)).filter(InventoryItem.deleted_at == None).all()
async def get_open_event(self, db: Session, inventory_item: InventoryItem, open_event_id: int) -> OpenEvent:
return db.query(OpenEvent).filter(OpenEvent.source_item == inventory_item.physical_item).filter(OpenEvent.id == open_event_id).filter(OpenEvent.deleted_at == None).first()
async def get_open_events_for_inventory_item(self, db: Session, inventory_item: InventoryItem) -> List[OpenEvent]:
return db.query(OpenEvent).filter(OpenEvent.source_item == inventory_item.physical_item).filter(OpenEvent.deleted_at == None).all()
async def get_inventory_item(self, db: Session, inventory_item_id: int) -> InventoryItem:
return db.query(InventoryItem)\
.options(
joinedload(InventoryItem.physical_item).joinedload(PhysicalItem.product_direct)
)\
.filter(InventoryItem.id == inventory_item_id)\
.first()
async def get_expected_value(self, db: Session, product_id: int) -> float:
expected_value = db.query(SealedExpectedValue).filter(SealedExpectedValue.tcgplayer_product_id == product_id).filter(SealedExpectedValue.deleted_at == None).first()
return expected_value.expected_value if expected_value else None
async def get_transactions(self, db: Session, skip: int, limit: int) -> List[Transaction]:
return db.query(Transaction)\
.filter(Transaction.deleted_at == None)\
.order_by(Transaction.transaction_date.desc())\
.offset(skip)\
.limit(limit)\
.all()
async def get_transaction(self, db: Session, transaction_id: int) -> Transaction:
return db.query(Transaction)\
.options(
joinedload(Transaction.transaction_items).joinedload(TransactionItem.inventory_item).joinedload(InventoryItem.physical_item).joinedload(PhysicalItem.product_direct),
joinedload(Transaction.vendors),
joinedload(Transaction.customers),
joinedload(Transaction.marketplaces)
)\
.filter(Transaction.id == transaction_id)\
.filter(Transaction.deleted_at == None)\
.first()
async def create_expected_value(self, db: Session, expected_value_data: SealedExpectedValueCreate) -> SealedExpectedValue:
with db_transaction(db):
expected_value = SealedExpectedValue(
tcgplayer_product_id=expected_value_data.tcgplayer_product_id,
expected_value=expected_value_data.expected_value
)
db.add(expected_value)
db.flush()
return expected_value
async def create_purchase_transaction(
self,
db: Session,
transaction_data: PurchaseTransactionCreate
) -> Transaction:
"""
Creates a purchase transaction from a vendor.
For each item:
1. Creates a PhysicalItem (SealedCase/SealedBox)
2. Creates an InventoryItem with the purchase price as cost basis
3. Creates TransactionItems linking the purchase to the items
"""
try:
with db_transaction(db):
# Create the transaction
transaction = Transaction(
vendor_id=transaction_data.vendor_id,
transaction_type='purchase',
transaction_date=transaction_data.transaction_date,
transaction_notes=transaction_data.transaction_notes
)
db.add(transaction)
db.flush()
total_amount = 0
physical_items = []
case_service = self.get_service("case")
box_service = self.get_service("box")
for item in transaction_data.items:
# Create the physical item based on type
# TODO: remove is_case and num_boxes, should derive from product_id
# TODO: add support for purchasing single cards
if item.item_type == "case":
for i in range(item.quantity):
physical_item = await case_service.create_case(
db=db,
product_id=item.product_id,
cost_basis=item.unit_price,
num_boxes=item.num_boxes
)
physical_items.append(physical_item)
elif item.item_type == "box":
for i in range(item.quantity):
physical_item = await box_service.create_box(
db=db,
product_id=item.product_id,
cost_basis=item.unit_price
)
physical_items.append(physical_item)
else:
raise ValueError(f"Invalid item type: {item.item_type}")
# TODO: add support for purchasing single cards
for physical_item in physical_items:
# Create transaction item
transaction.transaction_items.append(TransactionItem(
inventory_item_id=physical_item.inventory_item.id,
unit_price=item.unit_price
))
# Update transaction total
transaction.transaction_total_amount = total_amount
return transaction
except Exception as e:
raise e
async def create_customer(
self,
db: Session,
customer_name: str
) -> Customer:
try:
# check if customer already exists
existing_customer = db.query(Customer).filter(Customer.name == customer_name).first()
if existing_customer:
return existing_customer
with db_transaction(db):
customer = Customer(
name=customer_name
)
db.add(customer)
db.flush()
return customer
except Exception as e:
raise e
async def create_vendor(
self,
db: Session,
vendor_name: str
) -> Vendor:
try:
# check if vendor already exists
existing_vendor = db.query(Vendor).filter(Vendor.name == vendor_name).first()
if existing_vendor:
return existing_vendor
with db_transaction(db):
vendor = Vendor(
name=vendor_name
)
db.add(vendor)
db.flush()
return vendor
except Exception as e:
raise e
async def get_vendors(
self,
db: Session
) -> List[Vendor]:
return db.query(Vendor).all()
async def create_marketplace(
self,
db: Session,
marketplace_name: str
) -> Marketplace:
try:
# check if marketplace already exists
existing_marketplace = db.query(Marketplace).filter(Marketplace.name == marketplace_name).first()
if existing_marketplace:
return existing_marketplace
with db_transaction(db):
marketplace = Marketplace(
name=marketplace_name
)
db.add(marketplace)
db.flush()
return marketplace
except Exception as e:
raise e
async def get_marketplaces(
self,
db: Session
) -> List[Marketplace]:
return db.query(Marketplace).all()
class BoxService(BaseService[Box]):
def __init__(self):
super().__init__(Box)
async def create_box(
self,
db: Session,
product_id: int,
cost_basis: float
) -> Box:
try:
with db_transaction(db):
# Create the SealedBox
box = Box(
tcgplayer_product_id=product_id
)
db.add(box)
db.flush() # Get the ID for relationships
expected_value = box.products.sealed_expected_value.expected_value
box.expected_value = expected_value
db.flush()
# Create the InventoryItem for the sealed box
inventory_item = InventoryItem(
physical_item=box,
cost_basis=cost_basis
)
db.add(inventory_item)
return box
except Exception as e:
raise e
async def calculate_cost_basis_for_opened_cards(self, db: Session, open_event: OpenEvent) -> float:
box_cost_basis = open_event.source_item.inventory_item.cost_basis
box_expected_value = open_event.source_item.products.sealed_expected_value.expected_value
for resulting_card in open_event.resulting_items:
# ensure card
if resulting_card.item_type != "card":
raise ValueError(f"Expected card, got {resulting_card.item_type}")
resulting_card_market_value = resulting_card.products.most_recent_tcgplayer_price.market_price
resulting_card_cost_basis = (resulting_card_market_value / box_expected_value) * box_cost_basis
resulting_card.inventory_item.cost_basis = resulting_card_cost_basis
db.flush()
async def open_box(self, db: Session, box: Box, manabox_file_uploads: List[FileInDB]) -> bool:
with db_transaction(db):
# create open event
open_event = OpenEvent(
source_item=box,
open_date=datetime.now()
)
db.add(open_event)
db.flush()
manabox_upload_ids = [manabox_file_upload.id for manabox_file_upload in manabox_file_uploads]
staging_data = db.query(ManaboxImportStaging).filter(ManaboxImportStaging.file_id.in_(manabox_upload_ids)).all()
for record in staging_data:
for i in range(record.quantity):
open_card = Card(
tcgplayer_product_id=record.tcgplayer_product_id,
tcgplayer_sku_id=record.tcgplayer_sku_id
)
open_event.resulting_items.append(open_card)
inventory_item = InventoryItem(
physical_item=open_card,
cost_basis=0
)
db.add(inventory_item)
db.flush()
# calculate cost basis for opened cards
await self.calculate_cost_basis_for_opened_cards(db, open_event)
return open_event
class CaseService(BaseService[Case]):
def __init__(self):
super().__init__(Case)
async def create_case(self, db: Session, product_id: int, cost_basis: float, num_boxes: int) -> Case:
try:
with db_transaction(db):
# Create the SealedCase
case = Case(
tcgplayer_product_id=product_id,
num_boxes=num_boxes
)
db.add(case)
db.flush() # Get the ID for relationships
case.expected_value = case.products.sealed_expected_value.expected_value
# Create the InventoryItem for the sealed case
inventory_item = InventoryItem(
physical_item=case,
cost_basis=cost_basis
)
db.add(inventory_item)
return case
except Exception as e:
raise e
async def open_case(self, db: Session, case: Case, child_product_id: int) -> bool:
try:
## TODO should be able to import a manabox file with a case
## cost basis will be able to flow down to the card accurately
with db_transaction(db):
# Create the OpenEvent
open_event = OpenEvent(
source_item=case,
open_date=datetime.now()
)
db.add(open_event)
db.flush() # Get the ID for relationships
# Create num_boxes SealedBoxes
for i in range(case.num_boxes):
new_box = Box(
tcgplayer_product_id=child_product_id
)
open_event.resulting_items.append(new_box)
db.flush()
per_box_cost_basis = case.inventory_item.cost_basis / case.num_boxes
# Create the InventoryItem for the sealed box
inventory_item = InventoryItem(
physical_item=new_box,
cost_basis=per_box_cost_basis
)
db.add(inventory_item)
return True
except Exception as e:
raise e
class MarketplaceListingService(BaseService[MarketplaceListing]):
def __init__(self):
super().__init__(MarketplaceListing)
self.pricing_service = self.service_manager.get_service("pricing")
async def create_marketplace_listing(self, db: Session, inventory_item: InventoryItem, marketplace: Marketplace) -> MarketplaceListing:
try:
with db_transaction(db):
recommended_price = await self.pricing_service.set_price(db, inventory_item)
logger.info(f"recommended_price: {recommended_price.price}")
marketplace_listing = MarketplaceListing(
inventory_item=inventory_item,
marketplace=marketplace,
recommended_price=recommended_price,
listing_date=None,
delisting_date=None
)
db.add(marketplace_listing)
db.flush()
return marketplace_listing
except Exception as e:
raise e
async def update_marketplace_listing_price(self, db: Session, marketplace_listing: MarketplaceListing) -> MarketplaceListing:
try:
with db_transaction(db):
marketplace_listing.listed_price = self.pricing_service.set_price(marketplace_listing.inventory_item)
db.flush()
return marketplace_listing
except Exception as e:
raise e
async def get_marketplace_listing(self, db: Session, inventory_item: InventoryItem, marketplace: Marketplace) -> MarketplaceListing:
return db.query(MarketplaceListing).filter(MarketplaceListing.inventory_item == inventory_item).filter(MarketplaceListing.delisting_date == None).filter(MarketplaceListing.deleted_at == None).filter(MarketplaceListing.marketplace == marketplace).order_by(MarketplaceListing.created_at.desc()).first()
async def get_active_marketplace_listing(self, db: Session, inventory_item: InventoryItem, marketplace: Marketplace) -> MarketplaceListing:
return db.query(MarketplaceListing).filter(MarketplaceListing.inventory_item == inventory_item).filter(MarketplaceListing.delisting_date == None).filter(MarketplaceListing.deleted_at == None).filter(MarketplaceListing.marketplace == marketplace).filter(MarketplaceListing.listing_date != None).order_by(MarketplaceListing.created_at.desc()).first()
async def confirm_listings(self, db: Session, open_event: OpenEvent, marketplace: Marketplace) -> str:
tcgplayer_add_file = await self.create_tcgplayer_add_file(db, open_event, marketplace)
if not tcgplayer_add_file:
raise ValueError("No TCGplayer add file created")
with db_transaction(db):
for resulting_item in open_event.resulting_items:
marketplace_listing = await self.get_marketplace_listing(db, resulting_item.inventory_item, marketplace)
if marketplace_listing is None:
raise ValueError(f"No active marketplace listing found for inventory item id {resulting_item.inventory_item.id} in {marketplace.name}")
marketplace_listing.listing_date = datetime.now()
db.flush()
return tcgplayer_add_file
async def create_tcgplayer_add_file(self, db: Session, open_event: OpenEvent, marketplace: Marketplace) -> str:
# TCGplayer Id,Product Line,Set Name,Product Name,Title,Number,Rarity,Condition,TCG Market Price,TCG Direct Low,TCG Low Price With Shipping,TCG Low Price,Total Quantity,Add to Quantity,TCG Marketplace Price,Photo URL
headers = [
"TCGplayer Id",
"Product Line",
"Set Name",
"Product Name",
"Title",
"Number",
"Rarity",
"Condition",
"TCG Market Price",
"TCG Direct Low",
"TCG Low Price With Shipping",
"TCG Low Price",
"Total Quantity",
"Add to Quantity",
"TCG Marketplace Price",
"Photo URL"
]
data = {}
for resulting_item in open_event.resulting_items:
marketplace_listing = await self.get_marketplace_listing(db, resulting_item.inventory_item, marketplace)
if marketplace_listing is None:
raise ValueError(f"No active marketplace listing found for inventory item id {resulting_item.inventory_item.id} in {marketplace.name}")
tcgplayer_sku_id = resulting_item.tcgplayer_sku_id
if tcgplayer_sku_id in data:
data[tcgplayer_sku_id]["Add to Quantity"] += 1
continue
product_line = resulting_item.products.category.name
set_name = resulting_item.products.group.name
product_name = resulting_item.products.name
title = ""
number = resulting_item.products.ext_number
rarity = resulting_item.products.ext_rarity
condition = " ".join([condition.capitalize() for condition in resulting_item.sku.condition.split(" ")]) + (" " + resulting_item.products.sub_type_name if resulting_item.products.sub_type_name == "Foil" else "")
tcg_market_price = resulting_item.products.most_recent_tcgplayer_price.market_price
tcg_direct_low = resulting_item.products.most_recent_tcgplayer_price.direct_low_price
tcg_low_price_with_shipping = resulting_item.products.most_recent_tcgplayer_price.low_price
tcg_low_price = resulting_item.products.most_recent_tcgplayer_price.low_price
total_quantity = ""
add_to_quantity = 1
# get average recommended price of product
# get inventory items with same tcgplayer_product_id
inventory_items = db.query(InventoryItem).filter(InventoryItem.physical_item.has(tcgplayer_sku_id=tcgplayer_sku_id)).all()
inventory_item_ids = [inventory_item.id for inventory_item in inventory_items]
logger.debug(f"inventory_item_ids: {inventory_item_ids}")
valid_listings = db.query(MarketplaceListing).filter(MarketplaceListing.inventory_item_id.in_(inventory_item_ids)).filter(MarketplaceListing.delisting_date == None).filter(MarketplaceListing.deleted_at == None).filter(MarketplaceListing.listing_date == None).all()
logger.debug(f"valid_listings: {valid_listings}")
avg_recommended_price = sum([listing.recommended_price.price for listing in valid_listings]) / len(valid_listings)
data[tcgplayer_sku_id] = {
"TCGplayer Id": tcgplayer_sku_id,
"Product Line": product_line,
"Set Name": set_name,
"Product Name": product_name,
"Title": title,
"Number": number,
"Rarity": rarity,
"Condition": condition,
"TCG Market Price": tcg_market_price,
"TCG Direct Low": tcg_direct_low,
"TCG Low Price With Shipping": tcg_low_price_with_shipping,
"TCG Low Price": tcg_low_price,
"Total Quantity": total_quantity,
"Add to Quantity": add_to_quantity,
"TCG Marketplace Price": f"{Decimal(avg_recommended_price):.2f}",
"Photo URL": ""
}
# format data into csv
# header
header_row = ",".join(headers)
# data
def escape_csv_value(value):
if value is None:
return ""
value = str(value)
if any(c in value for c in [',', '"', '\n']):
return f'"{value.replace('"', '""')}"'
return value
data_rows = [",".join([escape_csv_value(data[tcgplayer_id][header]) for header in headers]) for tcgplayer_id in data]
csv_data = "\n".join([header_row] + data_rows)
return csv_data