""" CCR ETL MTGJSON Processing Script This script handles the extraction, transformation, and loading of MTGJSON data into a PostgreSQL database. It supports downloading, unzipping, preprocessing, and batch inserting of various data formats. """ import json import os import yaml from typing import Union from zipfile import ZipFile import psycopg2 import requests import wmill from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine # Configuration paths DB_RESOURCE_PATH = 'u/joshuakrzemien/slick_postgresql' DB_CONFIG_PATH = 'f/CCR_ETL/ccr_db_config' EXTRACT_CONFIG_PATH = 'f/CCR_ETL/ccr_extract_config' DOWNLOAD_CONFIG_PATH = './shared/' # Default processing parameters DEFAULT_BATCH_SIZE = 1000 def validate_response_headers(response: requests.Response, outer_file_type: str) -> None: """Validate that the response content type matches the expected file type.""" if response.headers['Content-Type'] != f'application/{outer_file_type}': raise ValueError(f"Expected {outer_file_type} response, got {response.headers['Content-Type']}") def download_request(url: str, filename: str, outer_file_type: str) -> bytes: """Download a file from the given URL and validate its content type.""" print(f"šŸ”½ Downloading {filename} from {url}") response = requests.get(url) response.raise_for_status() validate_response_headers(response, outer_file_type) print(f"āœ… Download successful ({response.headers.get('Content-Length', 'Unknown')} bytes)") return response.content def generate_download_queue(url: str, filename: str, outer_file_type: str, iterables: dict) -> list: """ Generate a queue of download items based on URL templates and iterable values. Example: url = "https://tcgcsv.com/tcgplayer/{game_id}/groups" iterables = {'game_id': [1,3,65,71,86]} """ queue = [] for key, value in iterables.items(): for item in value: queue_item = { 'url': url.format(key=key, value=item), 'filename': filename.format(key=key, value=item), 'outer_file_type': outer_file_type, } queue.append(queue_item) return queue def save_file(content: bytes, filename: str) -> None: """Save binary content to a file in the download directory.""" filepath = DOWNLOAD_CONFIG_PATH + filename with open(filepath, 'wb') as f: f.write(content) print(f"šŸ’¾ Saved {len(content)} bytes to {filename}") def unzip_file(filename: str) -> str: """Extract a zip file and return the name of the extracted content.""" new_filename = filename.replace('.zip', '') zip_path = DOWNLOAD_CONFIG_PATH + filename with ZipFile(zip_path, 'r') as zip_ref: file_list = zip_ref.namelist() print(f"šŸ“¦ Extracting {len(file_list)} files from {filename}") zip_ref.extractall(DOWNLOAD_CONFIG_PATH) return new_filename def load_file(filename: str, file_type: str) -> Union[dict, list]: """Load and parse a file from the download directory.""" filepath = DOWNLOAD_CONFIG_PATH + filename if file_type == 'json': with open(filepath, 'r') as f: data = json.load(f) print(f"šŸ“– Loaded {file_type} file: {filename}") return data else: raise ValueError(f"Unsupported file type: {file_type}") def build_record_from_config(source_data: dict, expected_columns: list, additional_data: dict = None) -> dict: """ Build a record using the structure defined in the extract config. Args: source_data: The source data dictionary expected_columns: List of column definitions from config additional_data: Optional additional data to merge (e.g., parent UUID) Returns: Dictionary representing a single database record """ if additional_data is None: additional_data = {} # Merge source data with additional data (like uuid from parent structure) combined_data = {**source_data, **additional_data} record = {} for column in expected_columns: col_name = column['name'] # Skip auto-increment columns (like 'id') if column.get('auto_increment', False): continue # Get value from combined data, use empty string as default record[col_name] = combined_data.get(col_name, '') return record def create_db_engine(db: dict) -> Engine: """Create and test a database engine connection.""" db_url = f"postgresql+psycopg2://postgres:{db['password']}@{db['host']}:{db['port']}/{db['dbname']}" engine = create_engine(db_url) # Test connection conn = engine.connect() conn.close() print(f"šŸ”Œ Connected to database: {db['host']}:{db['port']}/{db['dbname']}") return engine def get_db_engine() -> Engine: """Get a database engine using the configured resource.""" db = wmill.client.get_resource(DB_RESOURCE_PATH) return create_db_engine(db) def generic_preprocess( data: Union[dict, list], expected_columns: list, config: dict ) -> list: """ Generic data preprocessing function that handles various data structures. Args: data: Source data (dict or list) expected_columns: List of column definitions config: Preprocessing configuration Returns: List of processed records """ # Step 1: Follow data path data_path = config.get("data_path", []) for key in data_path: if not isinstance(data, dict): raise ValueError(f"Expected dict while navigating path, got {type(data)} at key '{key}'") data = data.get(key) if data is None: raise ValueError(f"Missing key '{key}' in data path: {data_path}") # Step 2: Handle nested structure nested = config.get("nested", False) nested_key = config.get("nested_key", None) id_key = config.get("id_key", None) flatten = config.get("flatten", False) records = [] if isinstance(data, dict): items = data.items() elif isinstance(data, list): items = enumerate(data) else: raise ValueError(f"Unsupported data structure: {type(data)}") for outer_key, outer_value in items: if nested: if not isinstance(outer_value, list): continue for inner_value in outer_value: if id_key and not inner_value.get(id_key): continue additional_data = {nested_key: outer_key} if nested_key else {} record = build_record_from_config(inner_value, expected_columns, additional_data) records.append(record) else: if not isinstance(outer_value, dict): continue if id_key and not outer_value.get(id_key): continue if flatten: nested_data = outer_value.get("identifiers", {}) combined = {**nested_data, "uuid": outer_value.get("uuid")} record = build_record_from_config(combined, expected_columns) else: record = build_record_from_config(outer_value, expected_columns) records.append(record) print(f"šŸ”„ Processed {len(records)} records") return records def control_batch(data: list, batch_size: int = DEFAULT_BATCH_SIZE): """Split data into batches for processing.""" for i in range(0, len(data), batch_size): yield data[i:i+batch_size] def insert_data_into_table_batch(records: list, table: str, engine: Engine, batch_size: int = DEFAULT_BATCH_SIZE) -> None: """Insert records into database table in batches.""" if not records: print("āš ļø No records to insert, skipping database operation") return print(f"šŸ’¾ Inserting {len(records)} records into {table} (batch size: {batch_size})") # Get column names from first record columns = list(records[0].keys()) column_names = ', '.join(f'"{col}"' for col in columns) placeholders = ', '.join([f':{col}' for col in columns]) insert_sql = f"INSERT INTO {table} ({column_names}) VALUES ({placeholders})" with engine.connect() as conn: batch_count = 0 total_inserted = 0 for batch in control_batch(records, batch_size): batch_count += 1 batch_size_actual = len(batch) conn.execute(text(insert_sql), batch) total_inserted += batch_size_actual if batch_count % 10 == 0: print(f"ā³ Inserted {total_inserted}/{len(records)} records...") conn.commit() print(f"āœ… Inserted {total_inserted} records in {batch_count} batches") def process_job(job: dict) -> dict: """ Process a single ETL job. Args: job: Job configuration dictionary Returns: Dictionary with job processing results """ # Extract job parameters url = job.get('url') filename = job.get('filename') outer_file_type = job.get('outer_file_type') inner_file_type = job.get('inner_file_type') table = job.get('table') expected_columns = job.get('expected_columns') batch_size = job.get('batch_size', DEFAULT_BATCH_SIZE) preprocess_function_name = job.get('preprocess_function', 'generic_preprocess') preprocess_config = job.get('preprocess_config') active = job.get('active') iterables = job.get('iterables') print(f"\nšŸš€ Processing job for table '{table}'") if not active: print(f"āš ļø Job is not active, skipping") return {"status": "skipped"} # Get preprocessing function if isinstance(preprocess_function_name, str): preprocess_function = globals().get(preprocess_function_name) if not callable(preprocess_function): raise ValueError(f"Preprocessing function '{preprocess_function_name}' not found or not callable.") # Get database engine engine = get_db_engine() # Populate download queue if iterables: queue = generate_download_queue(url, filename, outer_file_type, iterables) else: queue = [{ 'url': url, 'filename': filename, 'outer_file_type': outer_file_type, 'inner_file_type': inner_file_type, 'table': table, 'expected_columns': expected_columns }] # Process download queue for queue_item in queue: content = download_request(queue_item.get('url'), queue_item.get('filename'), queue_item.get('outer_file_type')) save_file(content, queue_item.get('filename')) # Handle file extraction if needed saved_filename = filename if outer_file_type == 'zip': saved_filename = unzip_file(filename) # Load and preprocess data data = load_file(saved_filename, inner_file_type) records = preprocess_function(data, expected_columns, preprocess_config) # Insert data into database insert_data_into_table_batch(records, table, engine, batch_size) result = { "status": "success", "table": table, "records_processed": len(records), "filename": saved_filename } print(f"āœ… Job complete: {len(records)} records processed for {table}") return result def main() -> dict: """ Main ETL processing function. Returns: Dictionary with overall processing results """ print("šŸŽÆ ETL Process Starting") print("=" * 50) # Load configuration config_yaml = wmill.get_variable(EXTRACT_CONFIG_PATH) config = yaml.safe_load(config_yaml) print(f"šŸ“‹ Processing {len(config['jobs'])} jobs") results = [] successful_jobs = 0 failed_jobs = 0 for i, job in enumerate(config['jobs'], 1): print(f"\n--- Job {i}/{len(config['jobs'])} ---") try: result = process_job(job) results.append(result) successful_jobs += 1 except Exception as e: error_result = { "status": "error", "table": job.get('table', 'unknown'), "error": str(e), "filename": job.get('filename', 'unknown') } results.append(error_result) failed_jobs += 1 print(f"āŒ Job {i} failed: {str(e)}") print(f"\nšŸ ETL Process Complete") print(f"āœ… Successful: {successful_jobs} | āŒ Failed: {failed_jobs} | šŸ“‹ Total: {len(results)}") return { "status": "completed", "jobs_processed": len(results), "successful_jobs": successful_jobs, "failed_jobs": failed_jobs, "results": results }