pokemans/scraper/app.py
2024-03-05 22:22:44 -05:00

165 lines
6.8 KiB
Python

from .app_log import LoggingManager
from .threads import Scheduler
from .costco import CostcoMonitor
class Application:
"""
Orchestrates the main application flow, including starting the submission stream,
managing periodic updates of post analytics, and initializing all necessary components
for the application to function.
Attributes:
reddit_monitor (RedditMonitor): Monitors Reddit for new or updated submissions.
webhook_notifier: Notifies external services via webhooks when certain actions occur.
api_conn: Manages API connections and requests.
post_manager (PostManager): Manages CRUD operations for posts.
post_analytics_manager (PostAnalyticsManager): Manages analytics for posts.
submission_manager (SubmissionManager): Manages the processing of Reddit submissions.
log_manager (LoggingManager): Centralized logging for the application.
scheduler: Manages the scheduling of periodic updates.
costco_manager (CostcoManager): Manages Costco product data.
"""
def __init__(
self,
reddit_monitor,
webhook_notifier,
api_conn,
post_manager,
post_analytics_manager,
submission_manager,
costco_manager,
):
"""
Initializes the application with all necessary components.
Parameters:
reddit_monitor (RedditMonitor): The component for monitoring Reddit submissions.
webhook_notifier: The notifier for sending updates via webhooks.
api_conn: The API connection manager.
post_manager (PostManager): The manager for post operations.
post_analytics_manager (PostAnalyticsManager): The manager for post analytics operations.
submission_manager (SubmissionManager): The manager for processing Reddit submissions.
"""
self.reddit_monitor = reddit_monitor
self.webhook_notifier = webhook_notifier
self.api_conn = api_conn
self.post_manager = post_manager
self.post_analytics_manager = post_analytics_manager
self.costco_manager = costco_manager
self.log_manager = LoggingManager("scraper.log")
self.submission_manager = submission_manager
self.scheduler = Scheduler()
# how often should post analytics be updated (call for update and database update are separate)
self.update_analytics_frequency = 60 * 15 # every 15 minutes
self.scrape_costco_frequency = 60 * 60 * 4 # every 4 hours
def update_analytics(self):
"""
Executes periodic updates for post analytics based on the predefined frequency.
"""
self.log_manager.info("Running periodic analytics update")
to_be_updated = self.post_manager.get_posts_from_last_7_days()
submissions = self.reddit_monitor.update_submissions(to_be_updated)
self.submission_manager.process_submissions(
submissions, self.update_analytics_frequency
)
def scrape_costco(self):
"""
Executes periodic updates for Costco products based on the predefined frequency.
"""
self.log_manager.info("Running periodic Costco scrape")
costco_monitor = CostcoMonitor(
"https://www.costco.com/CatalogSearch?dept=All&keyword=pokemon"
)
fetched_products = costco_monitor.get_products()
costco_monitor.close()
# Fetch existing products from the database, assuming it returns a list directly
existing_products = self.costco_manager.get_all_costco_products()
# Containers for updates
products_to_update = []
products_to_insert = []
# Mapping existing products for quick lookup
existing_products_map = {
product["sku"]: product for product in existing_products
}
for product in fetched_products:
existing_product = existing_products_map.get(product.sku)
if existing_product:
self.log_manager.log(f"Found existing product: {product.sku}")
needs_update = False
# Compare and decide if an update is necessary (for price change, activation/deactivation)
if existing_product["price"] != product.price:
existing_product["price"] = product.price
needs_update = True
if existing_product["active"] != product.active:
existing_product["active"] = product.active
needs_update = True
if needs_update:
products_to_update.append(existing_product)
else:
self.log_manager.log(f"Adding new product: {product.sku}")
products_to_insert.append(product)
# Update existing products in the database if necessary
for product in products_to_update:
self.costco_manager.update_costco_product(product)
# Insert new products into the database
for product in products_to_insert:
self.costco_manager.insert_costco_product(product)
# Optionally, deactivate products not found in the latest fetch
skus_fetched = {product.sku for product in fetched_products}
products_to_deactivate = [
product
for product in existing_products
if product["sku"] not in skus_fetched and product["active"]
]
for product in products_to_deactivate:
product["active"] = False
self.costco_manager.update_costco_product(product)
# Send notifications for new products
for product in products_to_insert:
self.webhook_notifier.costco_notification(product)
def add_scheduler_task(self, name, task, interval):
"""
Registers a task with the scheduler to be run at a specified interval.
Parameters:
name (str): Name of the task.
task (callable): The task function to be executed.
interval (int): The frequency in seconds at which the task should be executed.
"""
self.scheduler.add_task(name, task, interval)
def run(self):
"""
Starts the main application process, including streaming submissions, running periodic updates,
and processing submissions.
"""
self.log_manager.info("Application started")
# tasks
self.add_scheduler_task(
"update_analytics", self.update_analytics, self.update_analytics_frequency
)
self.add_scheduler_task(
"scrape_costco", self.scrape_costco, self.scrape_costco_frequency
)
# Stream submissions and process them
submissions = self.reddit_monitor.stream_submissions()
self.submission_manager.process_submissions(
submissions, self.update_analytics_frequency
)