giga_tcg/app/services/util/_dataframe.py
2025-02-07 20:54:55 -05:00

72 lines
2.9 KiB
Python

import pandas as pd
from io import StringIO
from app.db.models import File
class ManaboxRow:
def __init__(self, row: pd.Series):
# Integer field
try:
self.manabox_id = int(row['manabox_id'])
except (ValueError, TypeError):
raise ValueError(f"manabox_id must be convertible to integer, got: {row['manabox_id']}")
# String fields with None/NaN handling
self.name = str(row['name']) if pd.notna(row['name']) else ''
self.set_code = str(row['set_code']) if pd.notna(row['set_code']) else ''
self.set_name = str(row['set_name']) if pd.notna(row['set_name']) else ''
self.collector_number = str(row['collector_number']) if pd.notna(row['collector_number']) else ''
self.foil = str(row['foil']) if pd.notna(row['foil']) else ''
self.rarity = str(row['rarity']) if pd.notna(row['rarity']) else ''
self.scryfall_id = str(row['scryfall_id']) if pd.notna(row['scryfall_id']) else ''
self.condition = str(row['condition']) if pd.notna(row['condition']) else ''
self.language = str(row['language']) if pd.notna(row['language']) else ''
self.quantity = str(row['quantity']) if pd.notna(row['quantity']) else ''
class TCGPlayerPricingRow:
def __init__(self, row: pd.Series):
self.tcgplayer_id = row['tcgplayer_id']
self.product_line = row['product_line']
self.set_name = row['set_name']
self.product_name = row['product_name']
self.title = row['title']
self.number = row['number']
self.rarity = row['rarity']
self.condition = row['condition']
self.tcg_market_price = row['tcg_market_price']
self.tcg_direct_low = row['tcg_direct_low']
self.tcg_low_price_with_shipping = row['tcg_low_price_with_shipping']
self.tcg_low_price = row['tcg_low_price']
self.total_quantity = row['total_quantity']
self.add_to_quantity = row['add_to_quantity']
self.tcg_marketplace_price = row['tcg_marketplace_price']
self.photo_url = row['photo_url']
class DataframeUtil:
def __init__(self):
pass
def format_df_columns(self, df: pd.DataFrame) -> pd.DataFrame:
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(' ', '_')
return df
def file_to_df(self, file: File) -> pd.DataFrame:
with open(file.filepath, 'rb') as f:
content = f.read()
content = content.decode('utf-8')
df = pd.read_csv(StringIO(content))
df = self.format_df_columns(df)
return df
def csv_bytes_to_df(self, content: bytes) -> pd.DataFrame:
content = content.decode('utf-8')
df = pd.read_csv(StringIO(content))
df = self.format_df_columns(df)
return df
def df_to_csv_bytes(self, df: pd.DataFrame) -> bytes:
csv = df.to_csv(index=False)
return csv.encode('utf-8')