lots
This commit is contained in:
438
app/services/inventory_label_service.py
Normal file
438
app/services/inventory_label_service.py
Normal file
@@ -0,0 +1,438 @@
|
||||
from app.services.base_service import BaseService
|
||||
from app.models.inventory_management import InventoryLabel, InventoryLabelMetadata
|
||||
from app.schemas.inventory_label import InventoryLabelCreate, InventoryLabelUpdate, InventoryLabelDelete, InventoryLabelGet, InventoryLabelMetadataCreate
|
||||
from app.db.database import transaction as db_transaction
|
||||
import uuid as uuid_lib
|
||||
import re
|
||||
from sqlalchemy.orm import Session
|
||||
import qrcode
|
||||
from io import BytesIO
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
from typing import Optional
|
||||
|
||||
class InventoryLabelService(BaseService):
|
||||
def __init__(self):
|
||||
super().__init__(None)
|
||||
|
||||
def _convert_uuid_to_qr_code(self, uuid_string: str) -> bytes:
|
||||
"""
|
||||
Convert a UUID string to a QR code image as bytes.
|
||||
|
||||
Args:
|
||||
uuid_string: The UUID string to encode
|
||||
|
||||
Returns:
|
||||
bytes: The QR code image as bytes
|
||||
"""
|
||||
# Create QR code instance
|
||||
qr = qrcode.QRCode(
|
||||
version=1,
|
||||
error_correction=qrcode.constants.ERROR_CORRECT_M, # Medium error correction for better reliability
|
||||
box_size=8, # Smaller box size for better fit on labels
|
||||
border=2, # Smaller border to maximize QR code size
|
||||
)
|
||||
|
||||
# Add the UUID data to the QR code
|
||||
qr.add_data(uuid_string)
|
||||
qr.make(fit=True)
|
||||
|
||||
# Create the QR code image
|
||||
img = qr.make_image(fill_color="black", back_color="white")
|
||||
|
||||
# Convert to bytes
|
||||
img_buffer = BytesIO()
|
||||
img.save(img_buffer, format='PNG')
|
||||
img_buffer.seek(0)
|
||||
|
||||
return img_buffer.getvalue()
|
||||
|
||||
def _create_composite_label_image(self, qr_code_image: Image.Image, text: str, label_width: int = 991, label_height: int = 306) -> Image.Image:
|
||||
"""
|
||||
Create a composite image with QR code left-aligned and text right-aligned.
|
||||
|
||||
Args:
|
||||
qr_code_image: The QR code image to place on the left
|
||||
text: The text to place on the right
|
||||
label_width: Width of the label in pixels
|
||||
label_height: Height of the label in pixels
|
||||
|
||||
Returns:
|
||||
Image.Image: The composite label image
|
||||
"""
|
||||
# Create a new white canvas
|
||||
label_canvas = Image.new('RGB', (label_width, label_height), 'white')
|
||||
|
||||
# Calculate QR code size (square, fit within label height with margin)
|
||||
qr_margin = 20
|
||||
max_qr_size = label_height - (2 * qr_margin)
|
||||
qr_size = min(max_qr_size, label_width // 2) # QR takes up to half the width
|
||||
|
||||
# Resize QR code to fit
|
||||
resized_qr = qr_code_image.resize((qr_size, qr_size), Image.Resampling.LANCZOS)
|
||||
|
||||
# Position QR code on the left with margin
|
||||
qr_x = qr_margin
|
||||
qr_y = (label_height - qr_size) // 2 # Center vertically
|
||||
|
||||
# Paste QR code onto canvas
|
||||
label_canvas.paste(resized_qr, (qr_x, qr_y))
|
||||
|
||||
# Add text on the right side
|
||||
draw = ImageDraw.Draw(label_canvas)
|
||||
|
||||
# Try to use a default font, fall back to basic font if not available
|
||||
font_size = 24
|
||||
font = None
|
||||
|
||||
# Try multiple font paths
|
||||
font_paths = [
|
||||
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
|
||||
"/System/Library/Fonts/Arial.ttf",
|
||||
"/usr/share/fonts/TTF/Arial.ttf",
|
||||
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf"
|
||||
]
|
||||
|
||||
for font_path in font_paths:
|
||||
try:
|
||||
font = ImageFont.truetype(font_path, font_size)
|
||||
break
|
||||
except (OSError, IOError):
|
||||
continue
|
||||
|
||||
# Fall back to default font if no system font found
|
||||
if font is None:
|
||||
font = ImageFont.load_default()
|
||||
|
||||
# Calculate text position (right-aligned with margin)
|
||||
text_margin = 20
|
||||
text_x = label_width - text_margin
|
||||
text_y = label_height // 2 # Center vertically
|
||||
|
||||
# Get text bounding box to position it properly
|
||||
bbox = draw.textbbox((0, 0), text, font=font)
|
||||
text_width = bbox[2] - bbox[0]
|
||||
text_height = bbox[3] - bbox[1]
|
||||
|
||||
# Adjust position to right-align the text
|
||||
text_x = text_x - text_width
|
||||
text_y = text_y - (text_height // 2)
|
||||
|
||||
# Draw the text
|
||||
draw.text((text_x, text_y), text, fill='black', font=font)
|
||||
|
||||
return label_canvas
|
||||
|
||||
def _create_qr_code_with_text(self, uuid_string: str, text: str) -> bytes:
|
||||
"""
|
||||
Create a QR code image with text and return it as bytes.
|
||||
|
||||
Args:
|
||||
uuid_string: The UUID string to encode in QR code
|
||||
text: The text to display on the label
|
||||
|
||||
Returns:
|
||||
bytes: The composite image as bytes
|
||||
"""
|
||||
# Create QR code instance
|
||||
qr = qrcode.QRCode(
|
||||
version=1,
|
||||
error_correction=qrcode.constants.ERROR_CORRECT_M,
|
||||
box_size=8,
|
||||
border=2,
|
||||
)
|
||||
|
||||
# Add the UUID data to the QR code
|
||||
qr.add_data(uuid_string)
|
||||
qr.make(fit=True)
|
||||
|
||||
# Create the QR code image
|
||||
qr_image = qr.make_image(fill_color="black", back_color="white")
|
||||
|
||||
# Create composite image with text
|
||||
composite_image = self._create_composite_label_image(qr_image, text)
|
||||
|
||||
# Convert to bytes
|
||||
img_buffer = BytesIO()
|
||||
composite_image.save(img_buffer, format='PNG')
|
||||
img_buffer.seek(0)
|
||||
|
||||
return img_buffer.getvalue()
|
||||
|
||||
def create_qr_code_with_text_direct(self, uuid_string: str, text: str) -> bytes:
|
||||
"""
|
||||
Create a QR code image with text and return it as bytes directly.
|
||||
This method doesn't involve database operations and is useful for testing.
|
||||
|
||||
Args:
|
||||
uuid_string: The UUID string to encode in QR code
|
||||
text: The text to display on the label
|
||||
|
||||
Returns:
|
||||
bytes: The composite image as bytes
|
||||
"""
|
||||
return self._create_qr_code_with_text(uuid_string, text)
|
||||
|
||||
# create
|
||||
async def create_inventory_label(self, db: Session, inventory_label: InventoryLabelCreate, print: bool = True) -> InventoryLabel:
|
||||
file_service = self.get_service('file')
|
||||
# check if we have a upc
|
||||
if inventory_label.upc:
|
||||
# validate the upc
|
||||
if not self._is_valid_upc(inventory_label.upc):
|
||||
raise ValueError("Invalid UPC")
|
||||
# check if we have metadata
|
||||
if inventory_label.metadata:
|
||||
# validate the metadata
|
||||
for metadata in inventory_label.metadata:
|
||||
if not metadata.key or not metadata.value:
|
||||
raise ValueError("Invalid metadata")
|
||||
# generate a uuid
|
||||
label_uuid = str(uuid_lib.uuid4())
|
||||
with db_transaction(db):
|
||||
# create the inventory label
|
||||
inventory_label_model = InventoryLabel(uuid=label_uuid, upc=inventory_label.upc)
|
||||
db.add(inventory_label_model)
|
||||
db.flush()
|
||||
# add the metadata
|
||||
if inventory_label.metadata:
|
||||
for metadata in inventory_label.metadata:
|
||||
inventory_label_metadata_model = InventoryLabelMetadata(inventory_label_id=inventory_label_model.id, metadata_key=metadata.key, metadata_value=metadata.value)
|
||||
db.add(inventory_label_metadata_model)
|
||||
if print:
|
||||
# Create image with QR code and optional text
|
||||
if inventory_label.metadata and len(inventory_label.metadata) > 0:
|
||||
if inventory_label.upc:
|
||||
# add upc to metadata
|
||||
inventory_label.metadata.append(InventoryLabelMetadataCreate(key="upc", value=inventory_label.upc))
|
||||
# concat metadata key values separated by newlines and :
|
||||
text = "\n".join([f"{metadata.key}: {metadata.value}" for metadata in inventory_label.metadata])
|
||||
# Use composite image with QR code and text
|
||||
image_data = self._create_qr_code_with_text(label_uuid, text)
|
||||
else:
|
||||
# Use original QR code only
|
||||
image_data = self._convert_uuid_to_qr_code(label_uuid)
|
||||
|
||||
# save file
|
||||
filename = f"{label_uuid}.png"
|
||||
file_record = await file_service.save_file(
|
||||
db=db,
|
||||
file_data=image_data,
|
||||
filename=filename,
|
||||
subdir="inventory_labels",
|
||||
file_type="inventory_label",
|
||||
content_type="image/png",
|
||||
metadata={"uuid": label_uuid}
|
||||
)
|
||||
print_service = self.get_service('label_printer')
|
||||
await print_service.print_file(file_record.path, label_size="dk1201", label_type="inventory_label", copies=1)
|
||||
return inventory_label_model
|
||||
|
||||
# get
|
||||
|
||||
def classify_input_data(self, input_data: str) -> str:
|
||||
"""
|
||||
Classify input data as UPC, UUID, or other string with high accuracy.
|
||||
|
||||
Args:
|
||||
input_data: The string to classify
|
||||
|
||||
Returns:
|
||||
str: "upc", "uuid", or "other"
|
||||
"""
|
||||
if not input_data or not isinstance(input_data, str):
|
||||
return "other"
|
||||
|
||||
# Remove any whitespace
|
||||
input_data = input_data.strip()
|
||||
|
||||
# Check for UUID first (more specific pattern)
|
||||
if self._is_valid_uuid(input_data):
|
||||
return "uuid"
|
||||
|
||||
# Check for UPC code
|
||||
if self._is_valid_upc(input_data):
|
||||
return "upc"
|
||||
|
||||
return "other"
|
||||
|
||||
def _is_valid_uuid(self, uuid_string: str) -> bool:
|
||||
"""
|
||||
Validate if string is a proper UUID.
|
||||
|
||||
Args:
|
||||
uuid_string: String to validate
|
||||
|
||||
Returns:
|
||||
bool: True if valid UUID, False otherwise
|
||||
"""
|
||||
# UUID regex pattern for all versions
|
||||
uuid_pattern = re.compile(
|
||||
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
if not uuid_pattern.match(uuid_string):
|
||||
return False
|
||||
|
||||
try:
|
||||
# Validate UUID structure and version
|
||||
uuid_obj = uuid_lib.UUID(uuid_string)
|
||||
# Accept all UUID versions (1, 3, 4, 5)
|
||||
return uuid_obj.version in [1, 3, 4, 5]
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def _is_valid_upc(self, upc_string: str) -> bool:
|
||||
"""
|
||||
Validate if string is a proper UPC code.
|
||||
|
||||
Args:
|
||||
upc_string: String to validate
|
||||
|
||||
Returns:
|
||||
bool: True if valid UPC, False otherwise
|
||||
"""
|
||||
# Remove any non-digit characters
|
||||
digits_only = re.sub(r'[^0-9]', '', upc_string)
|
||||
|
||||
# UPC-A must be exactly 12 digits
|
||||
if len(digits_only) == 12:
|
||||
return self._validate_upc_a_checksum(digits_only)
|
||||
|
||||
# UPC-E must be exactly 8 digits
|
||||
if len(digits_only) == 8:
|
||||
return self._validate_upc_e_checksum(digits_only)
|
||||
|
||||
# EAN-13 must be exactly 13 digits
|
||||
if len(digits_only) == 13:
|
||||
return self._validate_ean_13_checksum(digits_only)
|
||||
|
||||
return False
|
||||
|
||||
def _validate_upc_a_checksum(self, upc: str) -> bool:
|
||||
"""
|
||||
Validate UPC-A checksum.
|
||||
|
||||
Args:
|
||||
upc: 12-digit UPC string
|
||||
|
||||
Returns:
|
||||
bool: True if checksum is valid
|
||||
"""
|
||||
if len(upc) != 12 or not upc.isdigit():
|
||||
return False
|
||||
|
||||
# Calculate checksum
|
||||
total = 0
|
||||
for i in range(11):
|
||||
digit = int(upc[i])
|
||||
if i % 2 == 0: # Odd positions (0-indexed)
|
||||
total += digit * 3
|
||||
else: # Even positions
|
||||
total += digit
|
||||
|
||||
checksum = (10 - (total % 10)) % 10
|
||||
return checksum == int(upc[11])
|
||||
|
||||
def _validate_upc_e_checksum(self, upc: str) -> bool:
|
||||
"""
|
||||
Validate UPC-E checksum.
|
||||
|
||||
Args:
|
||||
upc: 8-digit UPC-E string
|
||||
|
||||
Returns:
|
||||
bool: True if checksum is valid
|
||||
"""
|
||||
if len(upc) != 8 or not upc.isdigit():
|
||||
return False
|
||||
|
||||
# Calculate checksum
|
||||
total = 0
|
||||
for i in range(7):
|
||||
digit = int(upc[i])
|
||||
if i % 2 == 0: # Odd positions (0-indexed)
|
||||
total += digit * 3
|
||||
else: # Even positions
|
||||
total += digit
|
||||
|
||||
checksum = (10 - (total % 10)) % 10
|
||||
return checksum == int(upc[7])
|
||||
|
||||
def _validate_ean_13_checksum(self, ean: str) -> bool:
|
||||
"""
|
||||
Validate EAN-13 checksum.
|
||||
|
||||
Args:
|
||||
ean: 13-digit EAN string
|
||||
|
||||
Returns:
|
||||
bool: True if checksum is valid
|
||||
"""
|
||||
if len(ean) != 13 or not ean.isdigit():
|
||||
return False
|
||||
|
||||
# Calculate checksum
|
||||
total = 0
|
||||
for i in range(12):
|
||||
digit = int(ean[i])
|
||||
if i % 2 == 0: # Even positions (0-indexed)
|
||||
total += digit
|
||||
else: # Odd positions
|
||||
total += digit * 3
|
||||
|
||||
checksum = (10 - (total % 10)) % 10
|
||||
return checksum == int(ean[12])
|
||||
|
||||
async def get_inventory_label(self, db: Session, inventory_label_get: InventoryLabelGet) -> InventoryLabel:
|
||||
"""
|
||||
Get an inventory label by classifying the input data and querying the appropriate field.
|
||||
|
||||
Args:
|
||||
inventory_label_get: InventoryLabelGet object containing input_data
|
||||
|
||||
Returns:
|
||||
InventoryLabel: The found inventory label or None
|
||||
"""
|
||||
# check if we have a uuid or upc
|
||||
if inventory_label_get.uuid:
|
||||
return self._get_by_uuid(db, inventory_label_get.uuid)
|
||||
elif inventory_label_get.upc:
|
||||
return self._get_by_upc(db, inventory_label_get.upc)
|
||||
else:
|
||||
# check if we have input_data
|
||||
if inventory_label_get.input_data:
|
||||
# classify the input data
|
||||
input_type = self.classify_input_data(inventory_label_get.input_data)
|
||||
if input_type == "upc":
|
||||
return self._get_by_upc(db, inventory_label_get.input_data)
|
||||
elif input_type == "uuid":
|
||||
return self._get_by_uuid(db, inventory_label_get.input_data)
|
||||
else:
|
||||
raise ValueError("Invalid input data")
|
||||
else:
|
||||
raise ValueError("Invalid input data")
|
||||
|
||||
def _get_by_upc(self, db: Session, upc: str) -> InventoryLabel:
|
||||
"""
|
||||
Get inventory label by UPC.
|
||||
|
||||
Args:
|
||||
upc: The UPC code to search for
|
||||
|
||||
Returns:
|
||||
InventoryLabel: The found inventory label or None
|
||||
"""
|
||||
return db.query(InventoryLabel).filter(InventoryLabel.upc == upc).first()
|
||||
|
||||
def _get_by_uuid(self, db: Session, uuid: str) -> InventoryLabel:
|
||||
"""
|
||||
Get inventory label by UUID.
|
||||
|
||||
Args:
|
||||
uuid: The UUID to search for
|
||||
|
||||
Returns:
|
||||
InventoryLabel: The found inventory label or None
|
||||
"""
|
||||
return db.query(InventoryLabel).filter(InventoryLabel.uuid == uuid).first()
|
Reference in New Issue
Block a user