labels and stuff

This commit is contained in:
2025-04-13 21:11:55 -04:00
parent 56c2d1de47
commit 18b32c8514
26 changed files with 1627 additions and 25 deletions

View File

@@ -0,0 +1,77 @@
from typing import List, Dict, Optional, Literal
import csv
import os
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
class AddressLabelService:
def __init__(self):
self.template_dir = Path("app/data/assets/templates")
self.env = Environment(loader=FileSystemLoader(str(self.template_dir)))
self.templates = {
"dk1241": self.env.get_template("address_label_dk1241.html"),
"dk1201": self.env.get_template("address_label_dk1201.html")
}
self.return_address_path = "file://" + os.path.abspath("app/data/assets/images/ccrcardsaddress.png")
self.output_dir = "app/data/cache/tcgplayer/address_labels/"
os.makedirs(self.output_dir, exist_ok=True)
def generate_labels_from_csv(self, csv_path: str, label_type: Literal["dk1201", "dk1241"]) -> List[str]:
"""Generate address labels from a CSV file and save them as PDFs.
Args:
csv_path: Path to the CSV file containing address data
label_type: Type of label to generate ("6x4" or "dk1201")
Returns:
List of paths to generated PDF files
"""
generated_files = []
with open(csv_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# Generate label for each row
pdf_path = self._generate_single_label(row, label_type)
if pdf_path:
generated_files.append(str(pdf_path))
return generated_files
def _generate_single_label(self, row: Dict[str, str], label_type: Literal["dk1201", "dk1241"]) -> Optional[str]:
"""Generate a single address label PDF.
Args:
row: Dictionary containing address data
label_type: Type of label to generate ("6x4" or "dk1201")
Returns:
Path to the generated PDF file or None if generation failed
"""
try:
# Prepare template data
template_data = {
"recipient_name": f"{row['FirstName']} {row['LastName']}",
"address_line1": row['Address1'],
"address_line2": row['Address2'],
"city": row['City'],
"state": row['State'],
"zip_code": row['PostalCode']
}
# Add return address path only for 6x4 labels
if label_type == "dk1241":
template_data["return_address_path"] = self.return_address_path
# Render HTML
html_content = self.templates[label_type].render(**template_data)
# Generate PDF
pdf_path = self.output_dir + f"{row['Order #']}_{label_type}.pdf"
HTML(string=html_content).write_pdf(str(pdf_path))
return pdf_path
except Exception as e:
print(f"Error generating label for order {row.get('Order #', 'unknown')}: {str(e)}")
return None

View File

@@ -23,13 +23,19 @@ class BaseTCGPlayerService(BaseExternalService):
self.credentials = TCGPlayerCredentials()
def _get_headers(self, method: str) -> Dict[str, str]:
"""Get headers based on the HTTP method"""
def _get_headers(self, method: str, content_type: str = 'application/x-www-form-urlencoded') -> Dict[str, str]:
"""Get headers based on the HTTP method and content type
Args:
method: HTTP method (GET, POST, etc.)
content_type: Content type for the request. Defaults to 'application/x-www-form-urlencoded'
"""
base_headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'accept-language': 'en-US,en;q=0.8',
'priority': 'u=0, i',
'referer': 'https://store.tcgplayer.com/admin/pricing',
'referer': 'https://sellerportal.tcgplayer.com/',
'origin': 'https://sellerportal.tcgplayer.com',
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
@@ -45,8 +51,7 @@ class BaseTCGPlayerService(BaseExternalService):
if method == 'POST':
post_headers = {
'cache-control': 'max-age=0',
'content-type': 'application/x-www-form-urlencoded',
'origin': 'https://store.tcgplayer.com'
'content-type': content_type
}
base_headers.update(post_headers)

View File

@@ -0,0 +1,81 @@
from typing import Any, Dict, Optional, Union
import logging
from app.services.external_api.tcgplayer.base_tcgplayer_service import BaseTCGPlayerService
import os
logger = logging.getLogger(__name__)
class OrderManagementService(BaseTCGPlayerService):
ORDER_MANAGEMENT_BASE_URL = "https://order-management-api.tcgplayer.com/orders"
def __init__(self):
super().__init__()
self.base_url = self.ORDER_MANAGEMENT_BASE_URL
self.API_VERSION: str = "?api-version=2.0"
self.SELLER_KEY: str = "e576ed4c"
self.order_search_endpoint = f"/search{self.API_VERSION}"
self.packing_slip_endpoint = f"/packing-slips/export{self.API_VERSION}"
self.pull_sheet_endpoint = f"/pull-sheets/export{self.API_VERSION}"
self.shipping_endpoint = f"/shipping/export{self.API_VERSION}"
async def get_orders(self):
search_from = 0
orders = []
while True:
payload = {
"searchRange": "LastThreeMonths",
"filters": {
"sellerKey": self.SELLER_KEY
},
"sortBy": [
{"sortingType": "orderStatus", "direction": "ascending"},
{"sortingType": "orderDate", "direction": "descending"}
],
"from": search_from,
"size": 25
}
response = await self._make_request("POST", self.order_search_endpoint, data=payload, headers=self._get_headers("POST", "application/json"), auth_required=True)
if len(response.get("orders")) == 0:
break
search_from += 25
orders.extend(response.get("orders"))
return orders
async def get_order(self, order_id: str):
response = await self._make_request("GET", f"{self.ORDER_MANAGEMENT_BASE_URL}/{order_id}{self.API_VERSION}")
return response
async def get_packing_slip(self, order_ids: list[str]):
payload = {
"sortingType": "byRelease",
"format": "default",
"timezoneOffset": -4,
"orderNumbers": order_ids
}
response = await self._make_request("POST", self.packing_slip_endpoint, data=payload, headers=self._get_headers("POST", "application/json"), auth_required=True, download_file=True)
return response
async def get_pull_sheet(self, order_ids: list[str]):
payload = {
"orderNumbers": order_ids,
"timezoneOffset": -4
}
response = await self._make_request("POST", self.pull_sheet_endpoint, data=payload, headers=self._get_headers("POST", "application/json"), auth_required=True, download_file=True)
return response
async def get_shipping_csv(self, order_ids: list[str]):
payload = {
"orderNumbers": order_ids,
"timezoneOffset": -4
}
response = await self._make_request("POST", self.shipping_endpoint, data=payload, headers=self._get_headers("POST", "application/json"), auth_required=True, download_file=True)
return response
async def save_file(self, file_data: bytes, file_name: str) -> str:
if not os.path.exists("app/data/cache/tcgplayer/orders"):
os.makedirs("app/data/cache/tcgplayer/orders")
file_path = f"app/data/cache/tcgplayer/orders/{file_name}"
with open(file_path, "wb") as f:
f.write(file_data)
return file_path

View File

@@ -0,0 +1,231 @@
from typing import Optional, Union, Literal
import aiohttp
from pathlib import Path
from pdf2image import convert_from_path
from brother_ql.conversion import convert
from brother_ql.raster import BrotherQLRaster
import logging
import asyncio
import time
from PIL import Image
from contextlib import asynccontextmanager
logger = logging.getLogger(__name__)
class LabelPrinterService:
def __init__(self, printer_api_url: str = "http://localhost:8000"):
"""Initialize the label printer service.
Args:
printer_api_url: Base URL of the printer API endpoint
"""
self.printer_api_url = printer_api_url.rstrip('/')
self.status_url = f"{self.printer_api_url}/status"
self.print_url = f"{self.printer_api_url}/print"
self.cache_dir = Path("app/data/cache/prints")
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._session = None
self._lock = asyncio.Lock()
@asynccontextmanager
async def _get_session(self):
"""Context manager for aiohttp session."""
if self._session is None:
self._session = aiohttp.ClientSession()
try:
yield self._session
except Exception as e:
logger.error(f"Session error: {e}")
if self._session:
await self._session.close()
self._session = None
raise
async def _wait_for_printer_ready(self, max_wait: int = 300) -> bool:
"""Wait for the printer to be ready.
Args:
max_wait: Maximum time to wait in seconds
Returns:
bool: True if printer is ready, False if timeout
"""
start_time = time.time()
while time.time() - start_time < max_wait:
try:
async with self._get_session() as session:
async with session.get(self.status_url) as response:
if response.status == 200:
data = await response.json()
if data.get('status') == 'ready':
return True
elif data.get('status') == 'busy':
logger.info("Printer is busy, waiting...")
elif response.status == 404:
logger.error(f"Printer status endpoint not found at {self.status_url}")
return False
except aiohttp.ClientError as e:
logger.warning(f"Error checking printer status: {e}")
except Exception as e:
logger.error(f"Unexpected error in _wait_for_printer_ready: {e}")
return False
await asyncio.sleep(1)
logger.error("Timeout waiting for printer to be ready")
return False
async def _send_print_request(self, file_path: Union[str, Path]) -> bool:
"""Send print data to printer API.
Args:
file_path: Path to the binary data file to send to the printer
Returns:
bool: True if request was successful, False otherwise
"""
try:
# Read the binary data from the cache file
with open(file_path, "rb") as f:
print_data = f.read()
# Send the request to the printer API using aiohttp
async with self._get_session() as session:
async with session.post(
self.print_url,
data=print_data,
headers={'Content-Type': 'application/octet-stream'},
timeout=30
) as response:
if response.status == 200:
data = await response.json()
if data.get('message') == 'Print request processed successfully':
return True
logger.error(f"Unexpected success response: {data}")
return False
elif response.status == 404:
logger.error(f"Print endpoint not found at {self.print_url}")
return False
elif response.status == 429:
logger.error("Printer is busy")
return False
else:
data = await response.json()
logger.error(f"Print request failed with status {response.status}: {data.get('message')}")
return False
except aiohttp.ClientError as e:
logger.error(f"Error sending print request: {str(e)}")
return False
except Exception as e:
logger.error(f"Unexpected error in _send_print_request: {e}")
return False
async def print_file(self, file_path: Union[str, Path], label_size: Literal["dk1201", "dk1241"], label_type: Optional[Literal["address_label", "packing_slip"]] = None) -> bool:
"""Print a PDF or PNG file to the label printer.
Args:
file_path: Path to the PDF or PNG file
label_size: Size of label to use ("dk1201" or "dk1241")
label_type: Type of label to use ("address_label" or "packing_slip")
Returns:
bool: True if print was successful, False otherwise
"""
async with self._lock: # Ensure only one print operation at a time
try:
if file_path is None:
logger.error("No file path provided")
return False
file_path = Path(file_path)
if not file_path.exists():
logger.error(f"File not found: {file_path}")
return False
# Wait for printer to be ready
if not await self._wait_for_printer_ready():
logger.error("Printer not ready after waiting")
return False
if file_path.suffix.lower() == '.pdf':
# Convert PDF to images
logger.info(f"Converting PDF {file_path} to images")
images = convert_from_path(file_path)
if not images:
logger.error(f"No images could be extracted from {file_path}")
return False
logger.info(f"Successfully converted PDF to {len(images)} images")
else:
# For PNG files, we can use them directly
images = [Image.open(file_path)]
# Process each page
for i, image in enumerate(images):
logger.info(f"Processing page {i+1} with dimensions {image.size}")
# Resize image based on label size and type
resized_image = image.copy() # Create a copy to work with
# Store the original label size before we modify it
original_label_size = label_size
# Handle resizing based on label size and type
if original_label_size == "dk1241":
if label_type == "packing_slip":
resized_image = resized_image.resize((1164, 1660), Image.Resampling.LANCZOS)
elif label_type == "address_label":
resized_image = resized_image.resize((1660, 1164), Image.Resampling.LANCZOS)
else:
resized_image = resized_image.resize((1164, 1660), Image.Resampling.LANCZOS)
elif original_label_size == "dk1201" and label_type == "address_label":
resized_image = resized_image.resize((991, 306), Image.Resampling.LANCZOS)
# if file path contains address_label, rotate image 90 degrees
if label_type == "address_label":
rotate = "90"
else:
rotate = "0"
# Convert to label format
qlr = BrotherQLRaster("QL-1100")
qlr.exception_on_warning = True
# Get label size based on type
brother_label_size = "29x90" if original_label_size == "dk1201" else "102x152"
converted_image = convert(
qlr=qlr,
images=[resized_image],
label=brother_label_size,
rotate=rotate,
threshold=70.0,
dither=False,
compress=False,
red=False,
dpi_600=False,
#hq=True,
hq=False,
cut=True
)
# Cache the converted binary data
cache_path = self.cache_dir / f"{file_path.stem}_{brother_label_size}_page_{i+1}_converted.bin"
with open(cache_path, "wb") as f:
f.write(converted_image)
# Send to API
if not await self._send_print_request(cache_path):
logger.error(f"Failed to print page {i+1}")
return False
# Wait for printer to be ready before processing next page
if i < len(images) - 1: # Don't wait after the last page
if not await self._wait_for_printer_ready():
logger.error("Printer not ready for next page")
return False
return True
except Exception as e:
logger.error(f"Error printing file {file_path}: {str(e)}")
return False

View File

@@ -0,0 +1,182 @@
from typing import Optional, Union, Literal
import os
import aiohttp
import cups
from pathlib import Path
from pdf2image import convert_from_path
from brother_ql.conversion import convert
from brother_ql.raster import BrotherQLRaster
import logging
import asyncio
import time
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class PrintService:
def __init__(self, printer_name: Optional[str] = None, printer_api_url: str = "http://localhost:8000/print",
min_print_interval: int = 30):
"""Initialize the print service.
Args:
printer_name: Name of the printer to use. If None, will use default printer.
printer_api_url: URL of the printer API endpoint
min_print_interval: Minimum time in seconds between print requests for label printer
"""
self.printer_name = printer_name
self.printer_api_url = printer_api_url
self.status_url = printer_api_url.replace('/print', '/status')
self.conn = cups.Connection()
self.cache_dir = Path("app/data/cache/prints")
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Rate limiting and coordination
self.min_print_interval = min_print_interval
self._last_print_time = None
self._print_lock = asyncio.Lock()
async def _wait_for_printer_ready(self, max_wait: int = 300) -> bool:
"""Wait for the printer to be ready.
Args:
max_wait: Maximum time to wait in seconds
Returns:
bool: True if printer is ready, False if timeout
"""
start_time = time.time()
while time.time() - start_time < max_wait:
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.status_url) as response:
if response.status == 200:
data = await response.json()
if data.get('status') == 'ready':
return True
except Exception as e:
logger.warning(f"Error checking printer status: {e}")
await asyncio.sleep(1)
logger.error("Timeout waiting for printer to be ready")
return False
async def print_file(self, file_path: Union[str, Path], printer_type: Literal["regular", "label"] = "regular", label_type: Optional[Literal["dk1201", "dk1241"]] = None) -> bool:
"""Print a PDF or PNG file.
Args:
file_path: Path to the PDF or PNG file
printer_type: Type of printer ("regular" or "label")
label_type: Type of label to use ("dk1201" or "dk1241"). Only used when printer_type is "label".
Returns:
bool: True if print was successful, False otherwise
"""
try:
file_path = Path(file_path)
if not file_path.exists():
logger.error(f"File not found: {file_path}")
return False
if printer_type == "regular":
# For regular printers, use CUPS
printer = self.printer_name or self.conn.getDefault()
if not printer:
logger.error("No default printer found")
return False
job_id = self.conn.printFile(
printer,
str(file_path),
f"{file_path.suffix.upper()} Print",
{}
)
logger.info(f"Print job {job_id} submitted to printer {printer}")
return True
else:
# For label printers, we need to coordinate requests
if label_type is None:
logger.error("label_type must be specified when printing to label printer")
return False
# Wait for printer to be ready
if not await self._wait_for_printer_ready():
logger.error("Printer not ready after waiting")
return False
if file_path.suffix.lower() == '.pdf':
# Convert PDF to image first
images = convert_from_path(file_path)
if not images:
logger.error(f"No images could be extracted from {file_path}")
return False
image = images[0] # Only use first page
else:
# For PNG files, we can use them directly
from PIL import Image
image = Image.open(file_path)
# Convert to label format
qlr = BrotherQLRaster("QL-1100")
qlr.exception_on_warning = True
# Get label size based on type
label_size = "29x90" if label_type == "dk1201" else "102x152"
converted_image = convert(
qlr=qlr,
images=[image],
label=label_size,
rotate="0",
threshold=70.0,
dither=False,
compress=False,
red=False,
dpi_600=False,
hq=True,
cut=True
)
# Cache the converted binary data
cache_path = self.cache_dir / f"{file_path.stem}_{label_type}_converted.bin"
with open(cache_path, "wb") as f:
f.write(converted_image)
# Send to API
return await self._send_print_request(cache_path)
except Exception as e:
logger.error(f"Error printing file {file_path}: {str(e)}")
return False
async def _send_print_request(self, file_path: Union[str, Path]) -> bool:
"""Send print data to printer API.
Args:
file_path: Path to the binary data file to send to the printer
Returns:
bool: True if request was successful, False otherwise
"""
try:
# Read the binary data from the cache file
with open(file_path, "rb") as f:
print_data = f.read()
# Send the request to the printer API using aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
self.printer_api_url,
data=print_data,
timeout=30
) as response:
if response.status == 200:
return True
else:
response_text = await response.text()
logger.error(f"Print request failed with status {response.status}: {response_text}")
return False
except Exception as e:
logger.error(f"Error sending print request: {str(e)}")
return False

View File

@@ -0,0 +1,83 @@
from typing import List, Dict
import pandas as pd
from datetime import datetime
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
import logging
logger = logging.getLogger(__name__)
class PullSheetService:
def __init__(self):
self.template_dir = Path("app/data/assets/templates")
self.env = Environment(loader=FileSystemLoader(str(self.template_dir)))
self.template = self.env.get_template("pull_sheet.html")
self.output_dir = Path("app/data/cache/tcgplayer/pull_sheets")
self.output_dir.mkdir(parents=True, exist_ok=True)
def generate_pull_sheet_pdf(self, csv_path: str) -> str:
"""Generate a PDF pull sheet from a CSV file.
Args:
csv_path: Path to the CSV file containing pull sheet data
Returns:
Path to the generated PDF file
"""
try:
# Read and process CSV data
items = self._read_and_process_csv(csv_path)
# Prepare template data
template_data = {
'items': items,
'generation_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# Render HTML
html_content = self.template.render(**template_data)
# Generate PDF
pdf_path = self.output_dir / f"pull_sheet_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
HTML(string=html_content).write_pdf(str(pdf_path))
return str(pdf_path)
except Exception as e:
logger.error(f"Error generating pull sheet PDF: {str(e)}")
raise
def _read_and_process_csv(self, csv_path: str) -> List[Dict]:
"""Read and process CSV data using pandas.
Args:
csv_path: Path to the CSV file
Returns:
List of processed items
"""
# Read CSV into pandas DataFrame
df = pd.read_csv(csv_path)
# Filter out the "Orders Contained in Pull Sheet" row
df = df[df['Product Line'] != 'Orders Contained in Pull Sheet:']
# Convert Set Release Date to datetime
df['Set Release Date'] = pd.to_datetime(df['Set Release Date'], format='%m/%d/%Y %H:%M:%S')
# Sort by Set Release Date (descending) and then Product Name (ascending)
df = df.sort_values(['Set Release Date', 'Product Name'], ascending=[False, True])
# Convert to list of dictionaries
items = []
for _, row in df.iterrows():
items.append({
'product_name': row['Product Name'],
'condition': row['Condition'],
'quantity': str(int(row['Quantity'])), # Convert to string for template
'set': row['Set'],
'rarity': row['Rarity']
})
return items

View File

@@ -0,0 +1,55 @@
from typing import Optional, Union
import cups
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class RegularPrinterService:
def __init__(self, printer_name: Optional[str] = None):
"""Initialize the regular printer service.
Args:
printer_name: Name of the printer to use. If None, will use default printer.
"""
self.printer_name = printer_name
self.conn = cups.Connection()
async def print_file(self, file_path: Union[str, Path]) -> bool:
"""Print a PDF or PNG file to the regular printer.
Args:
file_path: Path to the PDF or PNG file
Returns:
bool: True if print was successful, False otherwise
"""
try:
if file_path is None:
logger.error("No file path provided")
return False
file_path = Path(file_path)
if not file_path.exists():
logger.error(f"File not found: {file_path}")
return False
# Get the printer
printer = self.printer_name or self.conn.getDefault()
if not printer:
logger.error("No default printer found")
return False
# Submit the print job
job_id = self.conn.printFile(
printer,
str(file_path),
f"{file_path.suffix.upper()} Print",
{}
)
logger.info(f"Print job {job_id} submitted to printer {printer}")
return True
except Exception as e:
logger.error(f"Error printing file {file_path}: {str(e)}")
return False