from datetime import datetime, timedelta import requests from models import Post import praw from zoneinfo import ZoneInfo from exceptions import InvalidMethodError, InvalidDataTypeError, APIRequestError from app_log import LoggingManager from threads import Scheduler, ThreadManager class ApiRequestHandler: def __init__(self, api_url: str): self.api_url = api_url self.log_manager = LoggingManager("scraper.log") def send_api_request( self, method: str, api_url: str, data=None, params=None ) -> dict: if method not in ["GET", "POST", "PUT", "DELETE"]: raise InvalidMethodError(f"Invalid method: {method}") if data is not None and not isinstance(data, dict): raise InvalidDataTypeError(f"Invalid data type: {type(data)} expected dict") if params is not None and not isinstance(params, dict): raise InvalidDataTypeError( f"Invalid data type: {type(params)} expected dict" ) response = requests.request(method, api_url, data=data, params=params) success_codes = [200, 201, 204] if response.status_code not in success_codes: self.log_manager.error( f"API request failed: {response.status_code} - {response.text}" ) raise APIRequestError(response.status_code, response.text) return response.json() class PostManager: def __init__(self, api_request_handler: ApiRequestHandler): self.api_request_handler = api_request_handler self.log_manager = LoggingManager("scraper.log") def get_post_by_reddit_id(self, reddit_id: str) -> dict: self.log_manager.log(f"Getting post by reddit id: {reddit_id}") response = self.api_request_handler.send_api_request( "GET", f"{self.api_request_handler.api_url}posts/?reddit_id={reddit_id}" ) return response def post_exists(self, reddit_id: str) -> bool: self.log_manager.log(f"Checking if post exists: {reddit_id}") response = self.get_post_by_reddit_id(reddit_id) if len(response) == 0: return False return True def insert_post(self, post) -> dict: self.log_manager.log(f"Inserting post: {post.reddit_id}") self.post = post data = { "reddit_id": self.post.reddit_id, "title": self.post.title, "name": self.post.name, "url": self.post.url, "created_utc": self.post.created_utc, "selftext": self.post.selftext, "permalink": self.post.permalink, } response = self.api_request_handler.send_api_request( "POST", f"{self.api_request_handler.api_url}posts/", data=data ) return response def get_posts_from_last_7_days(self) -> dict: self.log_manager.log("Getting posts from last 7 days") posts_from_last_7_days = self.api_request_handler.send_api_request( "GET", f"{self.api_request_handler.api_url}posts/?last_7_days=1" ) return posts_from_last_7_days class PostAnalyticsManager: def __init__( self, api_request_handler: ApiRequestHandler, post_manager: PostManager ): self.api_request_handler = api_request_handler self.post_manager = post_manager self.log_manager = LoggingManager("scraper.log") def check_update_requirements(self, reddit_id: str) -> bool: self.log_manager.log(f"Checking update requirements for {reddit_id}") # Specify your desired timezone, e.g., UTC timezone = ZoneInfo("UTC") # Make your datetime objects timezone-aware fifteen_minutes_ago = datetime.now(timezone) - timedelta(minutes=15) now = datetime.now(timezone) # Format datetime objects for the API request time_begin_str = fifteen_minutes_ago.isoformat(timespec="seconds") time_end_str = now.isoformat(timespec="seconds") post_id = self.post_manager.get_post_by_reddit_id(reddit_id) post_id = post_id[0]["id"] self.log_manager.log( f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}" ) response = self.api_request_handler.send_api_request( "GET", f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}", ) if len(response) > 0: # post should not be updated return False # post should be updated return True def update_post_analytics(self, post: Post) -> dict: self.log_manager.log(f"Updating post analytics for {post.reddit_id}") post_id = self.post_manager.get_post_by_reddit_id(post.reddit_id) post_id = post_id[0]["id"] data = { "post": post_id, "score": post.score, "num_comments": post.num_comments, "upvote_ratio": post.upvote_ratio, } response = self.api_request_handler.send_api_request( "POST", f"{self.api_request_handler.api_url}post_analytics/", data=data ) return response class RedditMonitor: def __init__( self, client_id, client_secret, user_agent, username, password, subreddit_name ): self.reddit = praw.Reddit( client_id=client_id, client_secret=client_secret, user_agent=user_agent, username=username, password=password, ) self.subreddit = self.reddit.subreddit(subreddit_name) self.log_manager = LoggingManager("scraper.log") def stream_submissions(self): self.log_manager.info("Starting submission stream") for submission in self.subreddit.stream.submissions(): yield submission def update_submissions(self, posts_to_update): self.log_manager.info("Updating submissions") for post in posts_to_update: submission = self.reddit.submission(id=post["reddit_id"]) yield submission class SubmissionManager: def __init__( self, reddit_monitor: RedditMonitor, post_manager: PostManager, post_analytics_manager: PostAnalyticsManager, WebhookNotifier, ): self.reddit_monitor = reddit_monitor self.post_manager = post_manager self.post_analytics_manager = post_analytics_manager self.webhook_notifier = WebhookNotifier self.log_manager = LoggingManager("scraper.log") def convert_submission_to_post(self, submission): post = Post( reddit_id=submission.id, title=submission.title, name=submission.name, url=submission.url, score=submission.score, num_comments=submission.num_comments, created_utc=submission.created_utc, selftext=submission.selftext, permalink=submission.permalink, upvote_ratio=submission.upvote_ratio, ) return post def process_submissions(self, submissions): for submission in submissions: self.log_manager.log(submission) if self.post_manager.post_exists(submission.id): self.log_manager.log("Post exists") self.log_manager.log(f"post id: {submission.id}") if self.post_analytics_manager.check_update_requirements(submission.id): self.log_manager.log("Update requirements met") post = self.convert_submission_to_post(submission) self.post_analytics_manager.update_post_analytics(post) else: post = self.convert_submission_to_post(submission) self.post_manager.insert_post(post) self.post_analytics_manager.update_post_analytics(post) self.webhook_notifier.send_notification(post) class Application: def __init__( self, reddit_monitor, webhook_notifier, api_conn, post_manager, post_analytics_manager, submission_manager, ): self.reddit_monitor = reddit_monitor self.webhook_notifier = webhook_notifier self.api_conn = api_conn self.post_manager = post_manager self.post_analytics_manager = post_analytics_manager self.log_manager = LoggingManager("scraper.log") self.submission_manager = submission_manager self.scheduler = None self.thread_manager = None def periodic_update(self): self.log_manager.info("Running periodic update") to_be_updated = self.post_manager.get_posts_from_last_7_days() submissions = self.reddit_monitor.update_submissions(to_be_updated) self.submission_manager.process_submissions(submissions) def run_periodic_update(self, interval): self.scheduler = Scheduler(interval, self.periodic_update) self.scheduler.run() def run(self): self.log_manager.info("Application started") update_frequency = 60 * 15 # 15 minutes in seconds self.thread_manager = ThreadManager( target=self.run_periodic_update, args=(update_frequency,) ) self.thread_manager.run() submissions = self.reddit_monitor.stream_submissions() self.submission_manager.process_submissions(submissions)