From 2088a30d75b7d8c4959418daf89d77e938f10b6d Mon Sep 17 00:00:00 2001 From: zman Date: Tue, 5 Mar 2024 17:03:40 -0500 Subject: [PATCH] add costco scraper --- .gitignore | 2 +- scraper/Dockerfile | 27 ++- scraper/__init__.py | 0 scraper/api.py | 249 +++++++++++++++++++++ scraper/app.py | 458 +++------------------------------------ scraper/config.py | 1 - scraper/costco.py | 80 +++++++ scraper/main.py | 13 +- scraper/reddit.py | 156 +++++++++++++ scraper/requirements.txt | Bin 740 -> 1416 bytes scraper/threads.py | 63 ++++-- scraper/webhook.py | 21 ++ 12 files changed, 617 insertions(+), 453 deletions(-) create mode 100644 scraper/__init__.py create mode 100644 scraper/api.py create mode 100644 scraper/costco.py create mode 100644 scraper/reddit.py diff --git a/.gitignore b/.gitignore index 68bd868..455c876 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ __pycache__ .venv *.sqlite3 -*.log \ No newline at end of file +*.log* \ No newline at end of file diff --git a/scraper/Dockerfile b/scraper/Dockerfile index 0c59270..be3b218 100644 --- a/scraper/Dockerfile +++ b/scraper/Dockerfile @@ -3,6 +3,31 @@ FROM python:3.11 # Set environment variables ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +ENV DEBIAN_FRONTEND noninteractive + +# Install dependencies for Chrome +RUN apt-get update && apt-get install -y wget gnupg2 ca-certificates unzip \ + && wget -q -O - https://dl.google.com/linux/linux_signing_key.pub | apt-key add - \ + && echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google-chrome.list \ + && apt-get update + +# Install Google Chrome +RUN apt-get install -y google-chrome-stable + +# Install specific version of ChromeDriver +ARG CHROMEDRIVER_VERSION=122.0.6261.94 +RUN wget -N https://storage.googleapis.com/chrome-for-testing-public/$CHROMEDRIVER_VERSION/linux64/chromedriver-linux64.zip -P ~/ \ + && unzip ~/chromedriver-linux64.zip -d ~/ \ + && rm ~/chromedriver-linux64.zip \ + && mv -f ~/chromedriver-linux64/chromedriver /usr/local/bin/chromedriver \ + && chown root:root /usr/local/bin/chromedriver \ + && chmod 0755 /usr/local/bin/chromedriver + +# Set display port to avoid crash +ENV DISPLAY=:99 + +# Upgrade pip +RUN pip install --upgrade pip # Set the working directory in the container WORKDIR /app @@ -11,4 +36,4 @@ WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -CMD ["python", "main.py"] \ No newline at end of file +CMD ["python", "main.py"] diff --git a/scraper/__init__.py b/scraper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scraper/api.py b/scraper/api.py new file mode 100644 index 0000000..bc3bb0b --- /dev/null +++ b/scraper/api.py @@ -0,0 +1,249 @@ +import requests +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo +from exceptions import APIRequestError, InvalidDataTypeError, InvalidMethodError +from app_log import LoggingManager +from models import Post + + +class ApiRequestHandler: + """ + Handles API requests for the application. Supports basic HTTP methods: GET, POST, PUT, DELETE. + Utilizes the `requests` library to send requests to a specified API URL and handles + response validation and error logging. + + Attributes: + api_url (str): The base URL for the API to which requests are sent. + log_manager (LoggingManager): Manages logging for API request operations. + """ + + def __init__(self, api_url: str): + self.api_url = api_url + self.log_manager = LoggingManager("scraper.log") + + def send_api_request( + self, method: str, api_url: str, data=None, params=None + ) -> dict: + """ + Sends a request to the API using the specified HTTP method, URL, and optional data and parameters. + + Parameters: + method (str): The HTTP method to use for the request. Must be one of: GET, POST, PUT, DELETE. + api_url (str): The URL endpoint to send the request to. + data (dict, optional): The payload to send in the request body. + params (dict, optional): The URL parameters to append to the request. + + Returns: + dict: The JSON response from the API. + + Raises: + InvalidMethodError: If the provided method is not supported. + InvalidDataTypeError: If `data` or `params` is provided but is not a dictionary. + APIRequestError: If the response from the API is not a success. + """ + if method not in ["GET", "POST", "PUT", "DELETE"]: + raise InvalidMethodError(f"Invalid method: {method}") + if data is not None and not isinstance(data, dict): + raise InvalidDataTypeError(f"Invalid data type: {type(data)} expected dict") + if params is not None and not isinstance(params, dict): + raise InvalidDataTypeError( + f"Invalid data type: {type(params)} expected dict" + ) + try: + response = requests.request(method, api_url, data=data, params=params) + except requests.RequestException as e: + self.log_manager.error(f"API request failed: {e}") + raise APIRequestError(0, str(e)) + success_codes = [200, 201, 204] + if response.status_code not in success_codes: + self.log_manager.error( + f"API request failed: {response.status_code} - {response.text}" + ) + raise APIRequestError(response.status_code, response.text) + return response.json() + + +class PostManager: + """ + Manages operations related to posts, including retrieval and insertion of post data into a database via API requests. + Utilizes an instance of ApiRequestHandler for API interactions and LoggingManager for logging operations. + + Attributes: + api_request_handler (ApiRequestHandler): Handles the API requests for interacting with post data. + log_manager (LoggingManager): Manages logging for operations performed by PostManager. + """ + + def __init__(self, api_request_handler: ApiRequestHandler): + """ + Initializes the PostManager with an API request handler for making API calls and a logging manager for logging. + + Parameters: + api_request_handler (ApiRequestHandler): The handler for making API requests. + """ + self.api_request_handler = api_request_handler + self.log_manager = LoggingManager("scraper.log") + + def get_post_by_reddit_id(self, reddit_id: str) -> dict: + """ + Retrieves a post by its Reddit ID from the database through an API call. + + Parameters: + reddit_id (str): The Reddit ID of the post to retrieve. + + Returns: + dict: The response from the API containing the post data. + """ + self.log_manager.log(f"Getting post by reddit id: {reddit_id}") + response = self.api_request_handler.send_api_request( + "GET", f"{self.api_request_handler.api_url}posts/?reddit_id={reddit_id}" + ) + return response + + def post_exists(self, reddit_id: str) -> bool: + """ + Checks if a post with the specified Reddit ID exists in the database. + + Parameters: + reddit_id (str): The Reddit ID of the post to check. + + Returns: + bool: True if the post exists, False otherwise. + """ + self.log_manager.log(f"Checking if post exists: {reddit_id}") + response = self.get_post_by_reddit_id(reddit_id) + if len(response) == 0: + return False + return True + + def insert_post(self, post) -> dict: + """ + Inserts a new post into the database through an API call. + + Parameters: + post (Post): The Post object containing the data to insert. + + Returns: + dict: The response from the API after attempting to insert the post data. + """ + self.log_manager.log(f"Inserting post: {post.reddit_id}") + data = { + "reddit_id": post.reddit_id, + "title": post.title, + "name": post.name, + "url": post.url, + "created_utc": post.created_utc, + "selftext": post.selftext, + "permalink": post.permalink, + } + response = self.api_request_handler.send_api_request( + "POST", f"{self.api_request_handler.api_url}posts/", data=data + ) + return response + + def get_posts_from_last_7_days(self) -> dict: + """ + Retrieves posts from the last 7 days from the database through an API call. + + Returns: + dict: The response from the API containing the posts from the last 7 days. + """ + self.log_manager.log("Getting posts from last 7 days") + posts_from_last_7_days = self.api_request_handler.send_api_request( + "GET", f"{self.api_request_handler.api_url}posts/?last_7_days=1" + ) + return posts_from_last_7_days + + +class PostAnalyticsManager: + """ + Manages the analytics for posts by interfacing with the API to check for update requirements + and update post analytics. This class leverages the ApiRequestHandler for API interactions + and the PostManager for retrieving specific post information. + + Attributes: + api_request_handler (ApiRequestHandler): Handles API requests for analytics data. + post_manager (PostManager): Manages post retrieval and existence checks. + log_manager (LoggingManager): Manages logging for analytics operations. + """ + + def __init__( + self, api_request_handler: ApiRequestHandler, post_manager: PostManager + ): + """ + Initializes the PostAnalyticsManager with necessary handlers and managers. + + Parameters: + api_request_handler (ApiRequestHandler): The API request handler for making API calls. + post_manager (PostManager): The manager for interacting with post data. + """ + self.api_request_handler = api_request_handler + self.post_manager = post_manager + self.log_manager = LoggingManager("scraper.log") + + def check_update_requirements(self, reddit_id: str, update_frequency: int) -> bool: + """ + Checks if the post identified by the given reddit_id meets the requirements for an update + by analyzing the analytics data within the last x seconds (update_frequency). + + Parameters: + reddit_id (str): The Reddit ID of the post to check. + update_frequency (int): The frequency in seconds for updating post analytics. + + Returns: + bool: True if the post meets update requirements, False otherwise. + """ + self.log_manager.log(f"Checking update requirements for {reddit_id}") + + # Specify your desired timezone, e.g., UTC + timezone = ZoneInfo("UTC") + + # Make your datetime objects timezone-aware + time_start = datetime.now(timezone) - timedelta(seconds=update_frequency) + now = datetime.now(timezone) + + # Format datetime objects for the API request + time_begin_str = time_start.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + time_end_str = now.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + post_id = self.post_manager.get_post_by_reddit_id(reddit_id) + post_id = post_id[0]["id"] + self.log_manager.log( + f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}" + ) + + response = self.api_request_handler.send_api_request( + "GET", + f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}", + ) + + if len(response) > 0: + # post should not be updated + return False + + # post should be updated + return True + + def update_post_analytics(self, post: Post) -> dict: + """ + Updates the analytics for a given post with new data such as score, number of comments, + and upvote ratio. + + Parameters: + post (Post): The post object containing the new analytics data. + + Returns: + dict: The response from the API after updating the post's analytics. + """ + self.log_manager.log(f"Updating post analytics for {post.reddit_id}") + post_id = self.post_manager.get_post_by_reddit_id(post.reddit_id) + post_id = post_id[0]["id"] + data = { + "post": post_id, + "score": post.score, + "num_comments": post.num_comments, + "upvote_ratio": post.upvote_ratio, + } + response = self.api_request_handler.send_api_request( + "POST", f"{self.api_request_handler.api_url}post_analytics/", data=data + ) + return response diff --git a/scraper/app.py b/scraper/app.py index f0cbe0b..45f0d8f 100644 --- a/scraper/app.py +++ b/scraper/app.py @@ -1,406 +1,6 @@ -from datetime import datetime, timedelta -import requests -from models import Post -import praw -from zoneinfo import ZoneInfo -from exceptions import InvalidMethodError, InvalidDataTypeError, APIRequestError from app_log import LoggingManager -from threads import Scheduler, ThreadManager - - -class ApiRequestHandler: - """ - Handles API requests for the application. Supports basic HTTP methods: GET, POST, PUT, DELETE. - Utilizes the `requests` library to send requests to a specified API URL and handles - response validation and error logging. - - Attributes: - api_url (str): The base URL for the API to which requests are sent. - log_manager (LoggingManager): Manages logging for API request operations. - """ - - def __init__(self, api_url: str): - self.api_url = api_url - self.log_manager = LoggingManager("scraper.log") - - def send_api_request( - self, method: str, api_url: str, data=None, params=None - ) -> dict: - """ - Sends a request to the API using the specified HTTP method, URL, and optional data and parameters. - - Parameters: - method (str): The HTTP method to use for the request. Must be one of: GET, POST, PUT, DELETE. - api_url (str): The URL endpoint to send the request to. - data (dict, optional): The payload to send in the request body. - params (dict, optional): The URL parameters to append to the request. - - Returns: - dict: The JSON response from the API. - - Raises: - InvalidMethodError: If the provided method is not supported. - InvalidDataTypeError: If `data` or `params` is provided but is not a dictionary. - APIRequestError: If the response from the API is not a success. - """ - if method not in ["GET", "POST", "PUT", "DELETE"]: - raise InvalidMethodError(f"Invalid method: {method}") - if data is not None and not isinstance(data, dict): - raise InvalidDataTypeError(f"Invalid data type: {type(data)} expected dict") - if params is not None and not isinstance(params, dict): - raise InvalidDataTypeError( - f"Invalid data type: {type(params)} expected dict" - ) - try: - response = requests.request(method, api_url, data=data, params=params) - except requests.RequestException as e: - self.log_manager.error(f"API request failed: {e}") - raise APIRequestError(0, str(e)) - success_codes = [200, 201, 204] - if response.status_code not in success_codes: - self.log_manager.error( - f"API request failed: {response.status_code} - {response.text}" - ) - raise APIRequestError(response.status_code, response.text) - return response.json() - - -class PostManager: - """ - Manages operations related to posts, including retrieval and insertion of post data into a database via API requests. - Utilizes an instance of ApiRequestHandler for API interactions and LoggingManager for logging operations. - - Attributes: - api_request_handler (ApiRequestHandler): Handles the API requests for interacting with post data. - log_manager (LoggingManager): Manages logging for operations performed by PostManager. - """ - - def __init__(self, api_request_handler: ApiRequestHandler): - """ - Initializes the PostManager with an API request handler for making API calls and a logging manager for logging. - - Parameters: - api_request_handler (ApiRequestHandler): The handler for making API requests. - """ - self.api_request_handler = api_request_handler - self.log_manager = LoggingManager("scraper.log") - - def get_post_by_reddit_id(self, reddit_id: str) -> dict: - """ - Retrieves a post by its Reddit ID from the database through an API call. - - Parameters: - reddit_id (str): The Reddit ID of the post to retrieve. - - Returns: - dict: The response from the API containing the post data. - """ - self.log_manager.log(f"Getting post by reddit id: {reddit_id}") - response = self.api_request_handler.send_api_request( - "GET", f"{self.api_request_handler.api_url}posts/?reddit_id={reddit_id}" - ) - return response - - def post_exists(self, reddit_id: str) -> bool: - """ - Checks if a post with the specified Reddit ID exists in the database. - - Parameters: - reddit_id (str): The Reddit ID of the post to check. - - Returns: - bool: True if the post exists, False otherwise. - """ - self.log_manager.log(f"Checking if post exists: {reddit_id}") - response = self.get_post_by_reddit_id(reddit_id) - if len(response) == 0: - return False - return True - - def insert_post(self, post) -> dict: - """ - Inserts a new post into the database through an API call. - - Parameters: - post (Post): The Post object containing the data to insert. - - Returns: - dict: The response from the API after attempting to insert the post data. - """ - self.log_manager.log(f"Inserting post: {post.reddit_id}") - data = { - "reddit_id": post.reddit_id, - "title": post.title, - "name": post.name, - "url": post.url, - "created_utc": post.created_utc, - "selftext": post.selftext, - "permalink": post.permalink, - } - response = self.api_request_handler.send_api_request( - "POST", f"{self.api_request_handler.api_url}posts/", data=data - ) - return response - - def get_posts_from_last_7_days(self) -> dict: - """ - Retrieves posts from the last 7 days from the database through an API call. - - Returns: - dict: The response from the API containing the posts from the last 7 days. - """ - self.log_manager.log("Getting posts from last 7 days") - posts_from_last_7_days = self.api_request_handler.send_api_request( - "GET", f"{self.api_request_handler.api_url}posts/?last_7_days=1" - ) - return posts_from_last_7_days - - -class PostAnalyticsManager: - """ - Manages the analytics for posts by interfacing with the API to check for update requirements - and update post analytics. This class leverages the ApiRequestHandler for API interactions - and the PostManager for retrieving specific post information. - - Attributes: - api_request_handler (ApiRequestHandler): Handles API requests for analytics data. - post_manager (PostManager): Manages post retrieval and existence checks. - log_manager (LoggingManager): Manages logging for analytics operations. - """ - - def __init__( - self, api_request_handler: ApiRequestHandler, post_manager: PostManager - ): - """ - Initializes the PostAnalyticsManager with necessary handlers and managers. - - Parameters: - api_request_handler (ApiRequestHandler): The API request handler for making API calls. - post_manager (PostManager): The manager for interacting with post data. - """ - self.api_request_handler = api_request_handler - self.post_manager = post_manager - self.log_manager = LoggingManager("scraper.log") - - def check_update_requirements(self, reddit_id: str, update_frequency: int) -> bool: - """ - Checks if the post identified by the given reddit_id meets the requirements for an update - by analyzing the analytics data within the last x seconds (update_frequency). - - Parameters: - reddit_id (str): The Reddit ID of the post to check. - update_frequency (int): The frequency in seconds for updating post analytics. - - Returns: - bool: True if the post meets update requirements, False otherwise. - """ - self.log_manager.log(f"Checking update requirements for {reddit_id}") - - # Specify your desired timezone, e.g., UTC - timezone = ZoneInfo("UTC") - - # Make your datetime objects timezone-aware - time_start = datetime.now(timezone) - timedelta(seconds=update_frequency) - now = datetime.now(timezone) - - # Format datetime objects for the API request - time_begin_str = time_start.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - time_end_str = now.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - - post_id = self.post_manager.get_post_by_reddit_id(reddit_id) - post_id = post_id[0]["id"] - self.log_manager.log( - f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}" - ) - - response = self.api_request_handler.send_api_request( - "GET", - f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}", - ) - - if len(response) > 0: - # post should not be updated - return False - - # post should be updated - return True - - def update_post_analytics(self, post: Post) -> dict: - """ - Updates the analytics for a given post with new data such as score, number of comments, - and upvote ratio. - - Parameters: - post (Post): The post object containing the new analytics data. - - Returns: - dict: The response from the API after updating the post's analytics. - """ - self.log_manager.log(f"Updating post analytics for {post.reddit_id}") - post_id = self.post_manager.get_post_by_reddit_id(post.reddit_id) - post_id = post_id[0]["id"] - data = { - "post": post_id, - "score": post.score, - "num_comments": post.num_comments, - "upvote_ratio": post.upvote_ratio, - } - response = self.api_request_handler.send_api_request( - "POST", f"{self.api_request_handler.api_url}post_analytics/", data=data - ) - return response - - -class RedditMonitor: - """ - Monitors Reddit submissions for a specific subreddit, streaming new submissions and - updating existing ones. Utilizes PRAW (Python Reddit API Wrapper) to interact with Reddit's API. - - Attributes: - reddit (praw.Reddit): An instance of the PRAW Reddit class for API interactions. - subreddit (praw.models.Subreddit): The subreddit object for the specified subreddit. - log_manager (LoggingManager): Manages logging for Reddit monitoring operations. - """ - - def __init__( - self, client_id, client_secret, user_agent, username, password, subreddit_name - ): - """ - Initializes the RedditMonitor with credentials for Reddit API access and the target subreddit. - - Parameters: - client_id (str): The client ID for the Reddit API application. - client_secret (str): The client secret for the Reddit API application. - user_agent (str): The user agent string identifying the application to Reddit. - username (str): The Reddit account username for authentication. - password (str): The Reddit account password for authentication. - subreddit_name (str): The name of the subreddit to monitor. - """ - self.reddit = praw.Reddit( - client_id=client_id, - client_secret=client_secret, - user_agent=user_agent, - username=username, - password=password, - ) - self.subreddit = self.reddit.subreddit(subreddit_name) - self.log_manager = LoggingManager("scraper.log") - - def stream_submissions(self): - """ - Streams new submissions from the specified subreddit, yielding each submission - as it becomes available. - - Yields: - praw.models.Submission: A submission object representing a Reddit post. - """ - self.log_manager.info("Starting submission stream") - for submission in self.subreddit.stream.submissions(): - yield submission - - def update_submissions(self, posts_to_update): - """ - Retrieves and yields submissions corresponding to a list of posts that need to be updated, - identified by their Reddit IDs. - - Parameters: - posts_to_update (list of dict): A list of dictionaries, each containing the 'reddit_id' of a post to update. - - Yields: - praw.models.Submission: A submission object for each post that needs to be updated. - """ - self.log_manager.info("Updating submissions") - for post in posts_to_update: - submission = self.reddit.submission(id=post["reddit_id"]) - yield submission - - -class SubmissionManager: - """ - Manages the processing of Reddit submissions, including conversion to post objects, - checking for updates, and notifying via webhook. It integrates closely with RedditMonitor, - PostManager, and PostAnalyticsManager to streamline the handling of new and existing submissions. - - Attributes: - reddit_monitor (RedditMonitor): Monitors and streams Reddit submissions. - post_manager (PostManager): Manages post data interactions. - post_analytics_manager (PostAnalyticsManager): Manages post analytics data. - webhook_notifier (WebhookNotifier): Handles notifications for new or updated posts. - log_manager (LoggingManager): Manages logging for submission processing operations. - """ - - def __init__( - self, - reddit_monitor: RedditMonitor, - post_manager: PostManager, - post_analytics_manager: PostAnalyticsManager, - WebhookNotifier, - ): - """ - Initializes the SubmissionManager with necessary components for processing submissions. - - Parameters: - reddit_monitor (RedditMonitor): The component for monitoring Reddit submissions. - post_manager (PostManager): The component for managing post data. - post_analytics_manager (PostAnalyticsManager): The component for managing post analytics. - WebhookNotifier: The component for sending notifications about posts. - """ - self.reddit_monitor = reddit_monitor - self.post_manager = post_manager - self.post_analytics_manager = post_analytics_manager - self.webhook_notifier = WebhookNotifier - self.log_manager = LoggingManager("scraper.log") - - def convert_submission_to_post(self, submission): - """ - Converts a Reddit submission object into a Post object suitable for database insertion - or analytics processing. - - Parameters: - submission (praw.models.Submission): The Reddit submission to convert. - - Returns: - Post: A Post object populated with data from the Reddit submission. - """ - post = Post( - reddit_id=submission.id, - title=submission.title, - name=submission.name, - url=submission.url, - score=submission.score, - num_comments=submission.num_comments, - created_utc=submission.created_utc, - selftext=submission.selftext, - permalink=submission.permalink, - upvote_ratio=submission.upvote_ratio, - ) - return post - - def process_submissions(self, submissions, update_frequency=None): - """ - Processes a stream of Reddit submissions, checking for their existence, updating analytics, - and notifying via webhook if necessary. Optionally respects an update frequency to limit updates. - - Parameters: - submissions (Iterable[praw.models.Submission]): An iterable of Reddit submission objects to process. - update_frequency (int, optional): The minimum frequency in seconds to update a post's analytics. - """ - for submission in submissions: - self.log_manager.log(submission) - if self.post_manager.post_exists(submission.id): - self.log_manager.log("Post exists") - self.log_manager.log(f"post id: {submission.id}") - if self.post_analytics_manager.check_update_requirements( - submission.id, update_frequency - ): - self.log_manager.log("Update requirements met") - post = self.convert_submission_to_post(submission) - self.post_analytics_manager.update_post_analytics(post) - else: - post = self.convert_submission_to_post(submission) - self.post_manager.insert_post(post) - self.post_analytics_manager.update_post_analytics(post) - self.webhook_notifier.send_notification(post) +from threads import Scheduler +from costco import CostcoMonitor class Application: @@ -430,7 +30,6 @@ class Application: post_manager, post_analytics_manager, submission_manager, - update_frequency, ): """ Initializes the application with all necessary components. @@ -451,31 +50,42 @@ class Application: self.post_analytics_manager = post_analytics_manager self.log_manager = LoggingManager("scraper.log") self.submission_manager = submission_manager - self.scheduler = None - self.thread_manager = None + self.scheduler = Scheduler() # how often should post analytics be updated (call for update and database update are separate) - self.update_frequency = update_frequency + self.update_analytics_frequency = 60 * 15 + self.scrape_costco_frequency = 60 * 60 - def periodic_update(self, update_frequency): + def update_analytics(self): """ - Executes periodic updates for post analytics based on a predefined frequency. - - Parameters: - update_frequency (int): The frequency, in seconds, at which to perform updates. + Executes periodic updates for post analytics based on the predefined frequency. """ - self.log_manager.info("Running periodic update") + self.log_manager.info("Running periodic analytics update") to_be_updated = self.post_manager.get_posts_from_last_7_days() submissions = self.reddit_monitor.update_submissions(to_be_updated) - self.submission_manager.process_submissions(submissions, update_frequency) + self.submission_manager.process_submissions(submissions, self.update_analytics_frequency) + + def scrape_costco(self): + """ + Executes periodic updates for costco products based on the predefined frequency. + """ + self.log_manager.info("Running periodic costco scrape") + costco_monitor = CostcoMonitor("https://www.costco.com/CatalogSearch?dept=All&keyword=pokemon") + products = costco_monitor.get_products() + costco_monitor.close() + self.log_manager.info(f"Found {len(products)} products on the page") + self.log_manager.info(products) + self.webhook_notifier.costco_notification(products) - def run_periodic_update(self): + def add_scheduler_task(self, name, task, interval): """ - Initializes and runs the scheduler for periodic updates. + Registers a task with the scheduler to be run at a specified interval. + + Parameters: + name (str): Name of the task. + task (callable): The task function to be executed. + interval (int): The frequency in seconds at which the task should be executed. """ - self.scheduler = Scheduler( - self.update_frequency, lambda: self.periodic_update(self.update_frequency) - ) - self.scheduler.run() + self.scheduler.add_task(name, task, interval) def run(self): """ @@ -483,7 +93,11 @@ class Application: and processing submissions. """ self.log_manager.info("Application started") - self.thread_manager = ThreadManager(target=self.run_periodic_update, args=()) - self.thread_manager.run() + + # tasks + self.add_scheduler_task("update_analytics", self.update_analytics, self.update_analytics_frequency) + self.add_scheduler_task("scrape_costco", self.scrape_costco, self.scrape_costco_frequency) + + # Stream submissions and process them submissions = self.reddit_monitor.stream_submissions() - self.submission_manager.process_submissions(submissions, self.update_frequency) + self.submission_manager.process_submissions(submissions, self.update_analytics_frequency) diff --git a/scraper/config.py b/scraper/config.py index edf951f..ee15e4a 100644 --- a/scraper/config.py +++ b/scraper/config.py @@ -12,4 +12,3 @@ class Config: USER_AGENT = "praw:zman.video_repost_bot:v0.1.0 (by u/jzman21)" DISABLE_WEBHOOK = False API_URL = "http://server:8000/api/" - UPDATE_FREQUENCY = 60 * 15 # 15 minutes diff --git a/scraper/costco.py b/scraper/costco.py new file mode 100644 index 0000000..cc1c906 --- /dev/null +++ b/scraper/costco.py @@ -0,0 +1,80 @@ +from selenium import webdriver +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import TimeoutException +from webdriver_manager.chrome import ChromeDriverManager +from app_log import LoggingManager + + +class CostcoMonitor: + def __init__(self, url): + self.url = url + chrome_options = Options() + chrome_options.add_argument("--headless") # Remove this line if you want to see the browser + chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3") + chrome_options.add_argument("--window-size=1920,1080") + chrome_options.add_argument("--log-level=3") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + self.driver = webdriver.Chrome(options=chrome_options) + self.log_manager = LoggingManager("scraper.log") + + def wait_for_page_load(self): + try: + WebDriverWait(self.driver, 20).until( + lambda driver: driver.execute_script("return document.readyState") == "complete" + ) + except TimeoutException: + self.log_manager.error("Timed out waiting for page to load") + + def get_products(self): + self.log_manager.info(f"Loading Costco page: {self.url}") + self.driver.get(self.url) + self.wait_for_page_load() # Wait for the page to fully load + + # Wait for the product list to be visible on the page + WebDriverWait(self.driver, 20).until( + EC.visibility_of_element_located((By.CSS_SELECTOR, "div.product-list.grid")) + ) + + products = self.driver.find_elements(By.CSS_SELECTOR, "div.col-xs-6.col-lg-4.col-xl-3.product") + self.log_manager.info(f"Found {len(products)} products on the page") + + product_detail_list = [] + + for product in products: + try: + product_sku = product.find_element(By.CSS_SELECTOR, "input[id^='product_sku_']").get_attribute('value') + product_name = product.find_element(By.CSS_SELECTOR, "input[id^='product_name_']").get_attribute('value') + price_element = product.find_element(By.CSS_SELECTOR, "div[class*='price']") + price = price_element.text if price_element else "Price not found" + img_element = product.find_element(By.CSS_SELECTOR, "a.product-image-url img.img-responsive") + img_url = img_element.get_attribute('src') if img_element else "Image URL not found" + product_link_element = product.find_element(By.CSS_SELECTOR, "a.product-image-url") + product_link = product_link_element.get_attribute('href') if product_link_element else "Product link not found" + product_detail_list.append({ + "sku": product_sku, + "name": product_name, + "price": price, + "img_url": img_url, + "product_link": product_link + }) + self.log_manager.log(f"SKU: {product_sku}, Name: {product_name}, Price: {price}, Image URL: {img_url}, Product Link: {product_link}") + + except Exception as e: + self.log_manager.error(f"Error processing product: {e}") + + return product_detail_list + + def close(self): + self.driver.quit() + self.log_manager.info("Browser closed") + +if __name__ == "__main__": + url = "https://www.costco.com/CatalogSearch?dept=All&keyword=pokemon" + monitor = CostcoMonitor(url) + monitor.get_products() + monitor.close() diff --git a/scraper/main.py b/scraper/main.py index 5c2b0cb..3da14e1 100644 --- a/scraper/main.py +++ b/scraper/main.py @@ -1,12 +1,7 @@ from webhook import WebhookNotifier -from app import ( - Application, - RedditMonitor, - ApiRequestHandler, - PostManager, - PostAnalyticsManager, - SubmissionManager, -) +from app import Application +from api import ApiRequestHandler, PostManager, PostAnalyticsManager +from reddit import RedditMonitor, SubmissionManager from config import Config from app_log import LoggingManager @@ -23,7 +18,6 @@ if __name__ == "__main__": disable_webhook = Config.DISABLE_WEBHOOK pkmn_env = Config.PKMN_ENV api_url = Config.API_URL - update_frequency = Config.UPDATE_FREQUENCY reddit_monitor = RedditMonitor( client_id, client_secret, user_agent, username, password, subreddit_name @@ -42,7 +36,6 @@ if __name__ == "__main__": post_manager, post_analytics_manager, submission_manager, - update_frequency, ) app.run() diff --git a/scraper/reddit.py b/scraper/reddit.py new file mode 100644 index 0000000..cabcda4 --- /dev/null +++ b/scraper/reddit.py @@ -0,0 +1,156 @@ +import praw +from app_log import LoggingManager +from models import Post +from api import PostManager, PostAnalyticsManager + + +class RedditMonitor: + """ + Monitors Reddit submissions for a specific subreddit, streaming new submissions and + updating existing ones. Utilizes PRAW (Python Reddit API Wrapper) to interact with Reddit's API. + + Attributes: + reddit (praw.Reddit): An instance of the PRAW Reddit class for API interactions. + subreddit (praw.models.Subreddit): The subreddit object for the specified subreddit. + log_manager (LoggingManager): Manages logging for Reddit monitoring operations. + """ + + def __init__( + self, client_id, client_secret, user_agent, username, password, subreddit_name + ): + """ + Initializes the RedditMonitor with credentials for Reddit API access and the target subreddit. + + Parameters: + client_id (str): The client ID for the Reddit API application. + client_secret (str): The client secret for the Reddit API application. + user_agent (str): The user agent string identifying the application to Reddit. + username (str): The Reddit account username for authentication. + password (str): The Reddit account password for authentication. + subreddit_name (str): The name of the subreddit to monitor. + """ + self.reddit = praw.Reddit( + client_id=client_id, + client_secret=client_secret, + user_agent=user_agent, + username=username, + password=password, + ) + self.subreddit = self.reddit.subreddit(subreddit_name) + self.log_manager = LoggingManager("scraper.log") + + def stream_submissions(self): + """ + Streams new submissions from the specified subreddit, yielding each submission + as it becomes available. + + Yields: + praw.models.Submission: A submission object representing a Reddit post. + """ + self.log_manager.info("Starting submission stream") + for submission in self.subreddit.stream.submissions(): + yield submission + + def update_submissions(self, posts_to_update): + """ + Retrieves and yields submissions corresponding to a list of posts that need to be updated, + identified by their Reddit IDs. + + Parameters: + posts_to_update (list of dict): A list of dictionaries, each containing the 'reddit_id' of a post to update. + + Yields: + praw.models.Submission: A submission object for each post that needs to be updated. + """ + self.log_manager.info("Updating submissions") + for post in posts_to_update: + submission = self.reddit.submission(id=post["reddit_id"]) + yield submission + + +class SubmissionManager: + """ + Manages the processing of Reddit submissions, including conversion to post objects, + checking for updates, and notifying via webhook. It integrates closely with RedditMonitor, + PostManager, and PostAnalyticsManager to streamline the handling of new and existing submissions. + + Attributes: + reddit_monitor (RedditMonitor): Monitors and streams Reddit submissions. + post_manager (PostManager): Manages post data interactions. + post_analytics_manager (PostAnalyticsManager): Manages post analytics data. + webhook_notifier (WebhookNotifier): Handles notifications for new or updated posts. + log_manager (LoggingManager): Manages logging for submission processing operations. + """ + + def __init__( + self, + reddit_monitor: RedditMonitor, + post_manager: PostManager, + post_analytics_manager: PostAnalyticsManager, + WebhookNotifier, + ): + """ + Initializes the SubmissionManager with necessary components for processing submissions. + + Parameters: + reddit_monitor (RedditMonitor): The component for monitoring Reddit submissions. + post_manager (PostManager): The component for managing post data. + post_analytics_manager (PostAnalyticsManager): The component for managing post analytics. + WebhookNotifier: The component for sending notifications about posts. + """ + self.reddit_monitor = reddit_monitor + self.post_manager = post_manager + self.post_analytics_manager = post_analytics_manager + self.webhook_notifier = WebhookNotifier + self.log_manager = LoggingManager("scraper.log") + + def convert_submission_to_post(self, submission): + """ + Converts a Reddit submission object into a Post object suitable for database insertion + or analytics processing. + + Parameters: + submission (praw.models.Submission): The Reddit submission to convert. + + Returns: + Post: A Post object populated with data from the Reddit submission. + """ + post = Post( + reddit_id=submission.id, + title=submission.title, + name=submission.name, + url=submission.url, + score=submission.score, + num_comments=submission.num_comments, + created_utc=submission.created_utc, + selftext=submission.selftext, + permalink=submission.permalink, + upvote_ratio=submission.upvote_ratio, + ) + return post + + def process_submissions(self, submissions, update_frequency=None): + """ + Processes a stream of Reddit submissions, checking for their existence, updating analytics, + and notifying via webhook if necessary. Optionally respects an update frequency to limit updates. + + Parameters: + submissions (Iterable[praw.models.Submission]): An iterable of Reddit submission objects to process. + update_frequency (int, optional): The minimum frequency in seconds to update a post's analytics. + """ + for submission in submissions: + self.log_manager.log(submission) + if self.post_manager.post_exists(submission.id): + self.log_manager.log("Post exists") + self.log_manager.log(f"post id: {submission.id}") + if self.post_analytics_manager.check_update_requirements( + submission.id, update_frequency + ): + self.log_manager.log("Update requirements met") + post = self.convert_submission_to_post(submission) + self.post_analytics_manager.update_post_analytics(post) + else: + post = self.convert_submission_to_post(submission) + self.post_manager.insert_post(post) + self.post_analytics_manager.update_post_analytics(post) + self.webhook_notifier.send_notification(post) diff --git a/scraper/requirements.txt b/scraper/requirements.txt index da03cbe4427d30afb4c0c11284f47f67f1c689a4..75acbbe05c4b09505b530f86e0ce945307c17991 100644 GIT binary patch literal 1416 zcmY*ZO;5s55ZtqgKLtVyeq215c=Vv*2bfA9P{C4K5aGwGGdq1eOaq$k?(EF$F26s8 zb+)j|S_?L}2RvgN+8sW*UE7rvw!_mRi?Q6JIie$vzN4zJJu)g(?`@5|4gNMZ@%#xg zGgJ*xN3*S@H;xyhxY@V@f}H5;b!*mv!{71EV==3-AecaGaI* zQw^)7BG|Wk(YO~IXVxO0&PCP9YF@@ zH`O64*cU#Uz|d}dnp1nVXL|%QUPl10<2i%cbGRjDTjV6KiNZKLO+15_UV&4L=rXkj zF*d+v2YUxo%_MH5T&7m}xdV&FbuH`-72o|_sDi4~MP<$+b~(Wp{_1FFbHv0#sJsxx z1Ewv#+zfgbS!)`?+vEKjPX)%~vn$*|;YmyBy6J2SRQ{UqW)^2u&EkK}rG7GD4{nC; UlRj`q#o_?9yan!N+KF`Gf8mqSegFUf delta 40 ycmV+@0N4MB3*-fmB9U+?k+j5JOwOH)DLg~ diff --git a/scraper/threads.py b/scraper/threads.py index bf2d44b..e123d2c 100644 --- a/scraper/threads.py +++ b/scraper/threads.py @@ -1,26 +1,53 @@ import threading - +import time class Scheduler: - def __init__(self, interval, function): - self.interval = interval - self.function = function - self.stop_event = threading.Event() + def __init__(self): + self.tasks = {} - def run(self): - while not self.stop_event.wait(self.interval): - self.function() + def add_task(self, task_name, function, interval): + """ + Adds a new task to the scheduler. - def stop(self): - self.stop_event.set() + Parameters: + task_name (str): Unique name for the task. + function (callable): The function to run for this task. + interval (int): Time in seconds between each execution of the task. + """ + task = { + "interval": interval, + "function": function, + "stop_event": threading.Event(), + "thread": threading.Thread(target=self.run_task, args=(task_name,), daemon=True) + } + self.tasks[task_name] = task + task["thread"].start() + def run_task(self, task_name): + """ + Executes the task in a loop until its stop event is set. -class ThreadManager: - def __init__(self, target, args: tuple = ()) -> None: - self.target = target - self.args = args + Parameters: + task_name (str): The name of the task to run. + """ + task = self.tasks[task_name] + while not task["stop_event"].is_set(): + task["function"]() + task["stop_event"].wait(task["interval"]) - def run(self): - thread = threading.Thread(target=self.target, args=self.args) - thread.daemon = True - thread.start() + def stop_task(self, task_name): + """ + Stops the specified task. + + Parameters: + task_name (str): The name of the task to stop. + """ + if task_name in self.tasks: + self.tasks[task_name]["stop_event"].set() + + def stop_all_tasks(self): + """ + Stops all tasks managed by the scheduler. + """ + for task_name in self.tasks.keys(): + self.stop_task(task_name) diff --git a/scraper/webhook.py b/scraper/webhook.py index 359f2e9..025e349 100644 --- a/scraper/webhook.py +++ b/scraper/webhook.py @@ -25,3 +25,24 @@ class WebhookNotifier: requests.post(self.webhook_url, data={"content": content}) except Exception as e: self.log_manager.error(f"Failed to send notification: {e}") + + def costco_notification(self, data): + for product in data: + sku = product.get("sku") + name = product.get("name") + price = product.get("price") + img_url = product.get("img_url") + product_link = product.get("product_link") + + content = f""" +**Costco has a new item!** +**Name:** {name} +**Price:** {price} +**Link:** {product_link} +{img_url}""" + if not self.disable_webhook: + self.log_manager.log(f"Sending notification to {self.webhook_url}") + try: + requests.post(self.webhook_url, data={"content": content}) + except Exception as e: + self.log_manager.error(f"Failed to send notification: {e}") \ No newline at end of file