pokemans/scraper/app.py

483 lines
20 KiB
Python

from datetime import datetime, timedelta
import requests
from models import Post
import praw
from zoneinfo import ZoneInfo
from exceptions import InvalidMethodError, InvalidDataTypeError, APIRequestError
from app_log import LoggingManager
from threads import Scheduler, ThreadManager
class ApiRequestHandler:
"""
Handles API requests for the application. Supports basic HTTP methods: GET, POST, PUT, DELETE.
Utilizes the `requests` library to send requests to a specified API URL and handles
response validation and error logging.
Attributes:
api_url (str): The base URL for the API to which requests are sent.
log_manager (LoggingManager): Manages logging for API request operations.
"""
def __init__(self, api_url: str):
self.api_url = api_url
self.log_manager = LoggingManager("scraper.log")
def send_api_request(
self, method: str, api_url: str, data=None, params=None
) -> dict:
"""
Sends a request to the API using the specified HTTP method, URL, and optional data and parameters.
Parameters:
method (str): The HTTP method to use for the request. Must be one of: GET, POST, PUT, DELETE.
api_url (str): The URL endpoint to send the request to.
data (dict, optional): The payload to send in the request body.
params (dict, optional): The URL parameters to append to the request.
Returns:
dict: The JSON response from the API.
Raises:
InvalidMethodError: If the provided method is not supported.
InvalidDataTypeError: If `data` or `params` is provided but is not a dictionary.
APIRequestError: If the response from the API is not a success.
"""
if method not in ["GET", "POST", "PUT", "DELETE"]:
raise InvalidMethodError(f"Invalid method: {method}")
if data is not None and not isinstance(data, dict):
raise InvalidDataTypeError(f"Invalid data type: {type(data)} expected dict")
if params is not None and not isinstance(params, dict):
raise InvalidDataTypeError(
f"Invalid data type: {type(params)} expected dict"
)
try:
response = requests.request(method, api_url, data=data, params=params)
except requests.RequestException as e:
self.log_manager.error(f"API request failed: {e}")
raise APIRequestError(0, str(e))
success_codes = [200, 201, 204]
if response.status_code not in success_codes:
self.log_manager.error(
f"API request failed: {response.status_code} - {response.text}"
)
raise APIRequestError(response.status_code, response.text)
return response.json()
class PostManager:
"""
Manages operations related to posts, including retrieval and insertion of post data into a database via API requests.
Utilizes an instance of ApiRequestHandler for API interactions and LoggingManager for logging operations.
Attributes:
api_request_handler (ApiRequestHandler): Handles the API requests for interacting with post data.
log_manager (LoggingManager): Manages logging for operations performed by PostManager.
"""
def __init__(self, api_request_handler: ApiRequestHandler):
"""
Initializes the PostManager with an API request handler for making API calls and a logging manager for logging.
Parameters:
api_request_handler (ApiRequestHandler): The handler for making API requests.
"""
self.api_request_handler = api_request_handler
self.log_manager = LoggingManager("scraper.log")
def get_post_by_reddit_id(self, reddit_id: str) -> dict:
"""
Retrieves a post by its Reddit ID from the database through an API call.
Parameters:
reddit_id (str): The Reddit ID of the post to retrieve.
Returns:
dict: The response from the API containing the post data.
"""
self.log_manager.log(f"Getting post by reddit id: {reddit_id}")
response = self.api_request_handler.send_api_request(
"GET", f"{self.api_request_handler.api_url}posts/?reddit_id={reddit_id}"
)
return response
def post_exists(self, reddit_id: str) -> bool:
"""
Checks if a post with the specified Reddit ID exists in the database.
Parameters:
reddit_id (str): The Reddit ID of the post to check.
Returns:
bool: True if the post exists, False otherwise.
"""
self.log_manager.log(f"Checking if post exists: {reddit_id}")
response = self.get_post_by_reddit_id(reddit_id)
if len(response) == 0:
return False
return True
def insert_post(self, post) -> dict:
"""
Inserts a new post into the database through an API call.
Parameters:
post (Post): The Post object containing the data to insert.
Returns:
dict: The response from the API after attempting to insert the post data.
"""
self.log_manager.log(f"Inserting post: {post.reddit_id}")
data = {
"reddit_id": post.reddit_id,
"title": post.title,
"name": post.name,
"url": post.url,
"created_utc": post.created_utc,
"selftext": post.selftext,
"permalink": post.permalink,
}
response = self.api_request_handler.send_api_request(
"POST", f"{self.api_request_handler.api_url}posts/", data=data
)
return response
def get_posts_from_last_7_days(self) -> dict:
"""
Retrieves posts from the last 7 days from the database through an API call.
Returns:
dict: The response from the API containing the posts from the last 7 days.
"""
self.log_manager.log("Getting posts from last 7 days")
posts_from_last_7_days = self.api_request_handler.send_api_request(
"GET", f"{self.api_request_handler.api_url}posts/?last_7_days=1"
)
return posts_from_last_7_days
class PostAnalyticsManager:
"""
Manages the analytics for posts by interfacing with the API to check for update requirements
and update post analytics. This class leverages the ApiRequestHandler for API interactions
and the PostManager for retrieving specific post information.
Attributes:
api_request_handler (ApiRequestHandler): Handles API requests for analytics data.
post_manager (PostManager): Manages post retrieval and existence checks.
log_manager (LoggingManager): Manages logging for analytics operations.
"""
def __init__(
self, api_request_handler: ApiRequestHandler, post_manager: PostManager
):
"""
Initializes the PostAnalyticsManager with necessary handlers and managers.
Parameters:
api_request_handler (ApiRequestHandler): The API request handler for making API calls.
post_manager (PostManager): The manager for interacting with post data.
"""
self.api_request_handler = api_request_handler
self.post_manager = post_manager
self.log_manager = LoggingManager("scraper.log")
def check_update_requirements(self, reddit_id: str, update_frequency: int) -> bool:
"""
Checks if the post identified by the given reddit_id meets the requirements for an update
by analyzing the analytics data within the last x seconds (update_frequency).
Parameters:
reddit_id (str): The Reddit ID of the post to check.
update_frequency (int): The frequency in seconds for updating post analytics.
Returns:
bool: True if the post meets update requirements, False otherwise.
"""
self.log_manager.log(f"Checking update requirements for {reddit_id}")
# Specify your desired timezone, e.g., UTC
timezone = ZoneInfo("UTC")
# Make your datetime objects timezone-aware
time_start = datetime.now(timezone) - timedelta(seconds=update_frequency)
now = datetime.now(timezone)
# Format datetime objects for the API request
time_begin_str = time_start.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
time_end_str = now.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
post_id = self.post_manager.get_post_by_reddit_id(reddit_id)
post_id = post_id[0]["id"]
self.log_manager.log(
f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}"
)
response = self.api_request_handler.send_api_request(
"GET",
f"{self.api_request_handler.api_url}post_analytics/?post={post_id}&time_begin={time_begin_str}&time_end={time_end_str}",
)
if len(response) > 0:
# post should not be updated
return False
# post should be updated
return True
def update_post_analytics(self, post: Post) -> dict:
"""
Updates the analytics for a given post with new data such as score, number of comments,
and upvote ratio.
Parameters:
post (Post): The post object containing the new analytics data.
Returns:
dict: The response from the API after updating the post's analytics.
"""
self.log_manager.log(f"Updating post analytics for {post.reddit_id}")
post_id = self.post_manager.get_post_by_reddit_id(post.reddit_id)
post_id = post_id[0]["id"]
data = {
"post": post_id,
"score": post.score,
"num_comments": post.num_comments,
"upvote_ratio": post.upvote_ratio,
}
response = self.api_request_handler.send_api_request(
"POST", f"{self.api_request_handler.api_url}post_analytics/", data=data
)
return response
class RedditMonitor:
"""
Monitors Reddit submissions for a specific subreddit, streaming new submissions and
updating existing ones. Utilizes PRAW (Python Reddit API Wrapper) to interact with Reddit's API.
Attributes:
reddit (praw.Reddit): An instance of the PRAW Reddit class for API interactions.
subreddit (praw.models.Subreddit): The subreddit object for the specified subreddit.
log_manager (LoggingManager): Manages logging for Reddit monitoring operations.
"""
def __init__(
self, client_id, client_secret, user_agent, username, password, subreddit_name
):
"""
Initializes the RedditMonitor with credentials for Reddit API access and the target subreddit.
Parameters:
client_id (str): The client ID for the Reddit API application.
client_secret (str): The client secret for the Reddit API application.
user_agent (str): The user agent string identifying the application to Reddit.
username (str): The Reddit account username for authentication.
password (str): The Reddit account password for authentication.
subreddit_name (str): The name of the subreddit to monitor.
"""
self.reddit = praw.Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent,
username=username,
password=password,
)
self.subreddit = self.reddit.subreddit(subreddit_name)
self.log_manager = LoggingManager("scraper.log")
def stream_submissions(self):
"""
Streams new submissions from the specified subreddit, yielding each submission
as it becomes available.
Yields:
praw.models.Submission: A submission object representing a Reddit post.
"""
self.log_manager.info("Starting submission stream")
for submission in self.subreddit.stream.submissions():
yield submission
def update_submissions(self, posts_to_update):
"""
Retrieves and yields submissions corresponding to a list of posts that need to be updated,
identified by their Reddit IDs.
Parameters:
posts_to_update (list of dict): A list of dictionaries, each containing the 'reddit_id' of a post to update.
Yields:
praw.models.Submission: A submission object for each post that needs to be updated.
"""
self.log_manager.info("Updating submissions")
for post in posts_to_update:
submission = self.reddit.submission(id=post["reddit_id"])
yield submission
class SubmissionManager:
"""
Manages the processing of Reddit submissions, including conversion to post objects,
checking for updates, and notifying via webhook. It integrates closely with RedditMonitor,
PostManager, and PostAnalyticsManager to streamline the handling of new and existing submissions.
Attributes:
reddit_monitor (RedditMonitor): Monitors and streams Reddit submissions.
post_manager (PostManager): Manages post data interactions.
post_analytics_manager (PostAnalyticsManager): Manages post analytics data.
webhook_notifier (WebhookNotifier): Handles notifications for new or updated posts.
log_manager (LoggingManager): Manages logging for submission processing operations.
"""
def __init__(
self,
reddit_monitor: RedditMonitor,
post_manager: PostManager,
post_analytics_manager: PostAnalyticsManager,
WebhookNotifier,
):
"""
Initializes the SubmissionManager with necessary components for processing submissions.
Parameters:
reddit_monitor (RedditMonitor): The component for monitoring Reddit submissions.
post_manager (PostManager): The component for managing post data.
post_analytics_manager (PostAnalyticsManager): The component for managing post analytics.
WebhookNotifier: The component for sending notifications about posts.
"""
self.reddit_monitor = reddit_monitor
self.post_manager = post_manager
self.post_analytics_manager = post_analytics_manager
self.webhook_notifier = WebhookNotifier
self.log_manager = LoggingManager("scraper.log")
def convert_submission_to_post(self, submission):
"""
Converts a Reddit submission object into a Post object suitable for database insertion
or analytics processing.
Parameters:
submission (praw.models.Submission): The Reddit submission to convert.
Returns:
Post: A Post object populated with data from the Reddit submission.
"""
post = Post(
reddit_id=submission.id,
title=submission.title,
name=submission.name,
url=submission.url,
score=submission.score,
num_comments=submission.num_comments,
created_utc=submission.created_utc,
selftext=submission.selftext,
permalink=submission.permalink,
upvote_ratio=submission.upvote_ratio,
)
return post
def process_submissions(self, submissions, update_frequency=None):
"""
Processes a stream of Reddit submissions, checking for their existence, updating analytics,
and notifying via webhook if necessary. Optionally respects an update frequency to limit updates.
Parameters:
submissions (Iterable[praw.models.Submission]): An iterable of Reddit submission objects to process.
update_frequency (int, optional): The minimum frequency in seconds to update a post's analytics.
"""
for submission in submissions:
self.log_manager.log(submission)
if self.post_manager.post_exists(submission.id):
self.log_manager.log("Post exists")
self.log_manager.log(f"post id: {submission.id}")
if self.post_analytics_manager.check_update_requirements(submission.id, update_frequency):
self.log_manager.log("Update requirements met")
post = self.convert_submission_to_post(submission)
self.post_analytics_manager.update_post_analytics(post)
else:
post = self.convert_submission_to_post(submission)
self.post_manager.insert_post(post)
self.post_analytics_manager.update_post_analytics(post)
self.webhook_notifier.send_notification(post)
class Application:
"""
Orchestrates the main application flow, including starting the submission stream,
managing periodic updates of post analytics, and initializing all necessary components
for the application to function.
Attributes:
reddit_monitor (RedditMonitor): Monitors Reddit for new or updated submissions.
webhook_notifier: Notifies external services via webhooks when certain actions occur.
api_conn: Manages API connections and requests.
post_manager (PostManager): Manages CRUD operations for posts.
post_analytics_manager (PostAnalyticsManager): Manages analytics for posts.
submission_manager (SubmissionManager): Manages the processing of Reddit submissions.
log_manager (LoggingManager): Centralized logging for the application.
scheduler: Manages the scheduling of periodic updates.
thread_manager: Manages threading for asynchronous operations.
update_frequency (int): The frequency, in seconds, at which post analytics should be updated.
"""
def __init__(
self,
reddit_monitor,
webhook_notifier,
api_conn,
post_manager,
post_analytics_manager,
submission_manager,
update_frequency
):
"""
Initializes the application with all necessary components.
Parameters:
reddit_monitor (RedditMonitor): The component for monitoring Reddit submissions.
webhook_notifier: The notifier for sending updates via webhooks.
api_conn: The API connection manager.
post_manager (PostManager): The manager for post operations.
post_analytics_manager (PostAnalyticsManager): The manager for post analytics operations.
submission_manager (SubmissionManager): The manager for processing Reddit submissions.
update_frequency (int): The frequency, in seconds, at which to perform updates.
"""
self.reddit_monitor = reddit_monitor
self.webhook_notifier = webhook_notifier
self.api_conn = api_conn
self.post_manager = post_manager
self.post_analytics_manager = post_analytics_manager
self.log_manager = LoggingManager("scraper.log")
self.submission_manager = submission_manager
self.scheduler = None
self.thread_manager = None
# how often should post analytics be updated (call for update and database update are separate)
self.update_frequency = update_frequency
def periodic_update(self, update_frequency):
"""
Executes periodic updates for post analytics based on a predefined frequency.
Parameters:
update_frequency (int): The frequency, in seconds, at which to perform updates.
"""
self.log_manager.info("Running periodic update")
to_be_updated = self.post_manager.get_posts_from_last_7_days()
submissions = self.reddit_monitor.update_submissions(to_be_updated)
self.submission_manager.process_submissions(submissions, update_frequency)
def run_periodic_update(self):
"""
Initializes and runs the scheduler for periodic updates.
"""
self.scheduler = Scheduler(self.update_frequency, lambda: self.periodic_update(self.update_frequency))
self.scheduler.run()
def run(self):
"""
Starts the main application process, including streaming submissions, running periodic updates,
and processing submissions.
"""
self.log_manager.info("Application started")
self.thread_manager = ThreadManager(
target=self.run_periodic_update, args=()
)
self.thread_manager.run()
submissions = self.reddit_monitor.stream_submissions()
self.submission_manager.process_submissions(submissions, self.update_frequency)