from app_log import LoggingManager from threads import Scheduler from costco import CostcoMonitor class Application: """ Orchestrates the main application flow, including starting the submission stream, managing periodic updates of post analytics, and initializing all necessary components for the application to function. Attributes: reddit_monitor (RedditMonitor): Monitors Reddit for new or updated submissions. webhook_notifier: Notifies external services via webhooks when certain actions occur. api_conn: Manages API connections and requests. post_manager (PostManager): Manages CRUD operations for posts. post_analytics_manager (PostAnalyticsManager): Manages analytics for posts. submission_manager (SubmissionManager): Manages the processing of Reddit submissions. log_manager (LoggingManager): Centralized logging for the application. scheduler: Manages the scheduling of periodic updates. costco_manager (CostcoManager): Manages Costco product data. """ def __init__( self, reddit_monitor, webhook_notifier, api_conn, post_manager, post_analytics_manager, submission_manager, costco_manager, ): """ Initializes the application with all necessary components. Parameters: reddit_monitor (RedditMonitor): The component for monitoring Reddit submissions. webhook_notifier: The notifier for sending updates via webhooks. api_conn: The API connection manager. post_manager (PostManager): The manager for post operations. post_analytics_manager (PostAnalyticsManager): The manager for post analytics operations. submission_manager (SubmissionManager): The manager for processing Reddit submissions. """ self.reddit_monitor = reddit_monitor self.webhook_notifier = webhook_notifier self.api_conn = api_conn self.post_manager = post_manager self.post_analytics_manager = post_analytics_manager self.costco_manager = costco_manager self.log_manager = LoggingManager("scraper.log") self.submission_manager = submission_manager self.scheduler = Scheduler() # how often should post analytics be updated (call for update and database update are separate) self.update_analytics_frequency = 60 * 15 # every 15 minutes self.scrape_costco_frequency = 60 * 60 * 4 # every 4 hours def update_analytics(self): """ Executes periodic updates for post analytics based on the predefined frequency. """ self.log_manager.info("Running periodic analytics update") to_be_updated = self.post_manager.get_posts_from_last_7_days() submissions = self.reddit_monitor.update_submissions(to_be_updated) self.submission_manager.process_submissions( submissions, self.update_analytics_frequency ) def scrape_costco(self): """ Executes periodic updates for Costco products based on the predefined frequency. """ self.log_manager.info("Running periodic Costco scrape") costco_monitor = CostcoMonitor( "https://www.costco.com/CatalogSearch?dept=All&keyword=pokemon" ) fetched_products = costco_monitor.get_products() costco_monitor.close() # Fetch existing products from the database, assuming it returns a list directly existing_products = self.costco_manager.get_all_costco_products() # Containers for updates products_to_update = [] products_to_insert = [] # Mapping existing products for quick lookup existing_products_map = { product["sku"]: product for product in existing_products } for product in fetched_products: existing_product = existing_products_map.get(product.sku) if existing_product: self.log_manager.log(f"Found existing product: {product.sku}") needs_update = False # Compare and decide if an update is necessary (for price change, activation/deactivation) if existing_product["price"] != product.price: existing_product["price"] = product.price needs_update = True if existing_product["active"] != product.active: existing_product["active"] = product.active needs_update = True if needs_update: products_to_update.append(existing_product) else: self.log_manager.log(f"Adding new product: {product.sku}") products_to_insert.append(product) # Update existing products in the database if necessary for product in products_to_update: self.costco_manager.update_costco_product(product) # Insert new products into the database for product in products_to_insert: self.costco_manager.insert_costco_product(product) # Optionally, deactivate products not found in the latest fetch skus_fetched = {product.sku for product in fetched_products} products_to_deactivate = [ product for product in existing_products if product["sku"] not in skus_fetched and product["active"] ] for product in products_to_deactivate: product["active"] = False self.costco_manager.update_costco_product(product) # Send notifications for new products for product in products_to_insert: self.webhook_notifier.costco_notification(product) def add_scheduler_task(self, name, task, interval): """ Registers a task with the scheduler to be run at a specified interval. Parameters: name (str): Name of the task. task (callable): The task function to be executed. interval (int): The frequency in seconds at which the task should be executed. """ self.scheduler.add_task(name, task, interval) def run(self): """ Starts the main application process, including streaming submissions, running periodic updates, and processing submissions. """ self.log_manager.info("Application started") # tasks self.add_scheduler_task( "update_analytics", self.update_analytics, self.update_analytics_frequency ) self.add_scheduler_task( "scrape_costco", self.scrape_costco, self.scrape_costco_frequency ) # Stream submissions and process them submissions = self.reddit_monitor.stream_submissions() self.submission_manager.process_submissions( submissions, self.update_analytics_frequency )