94 lines
4.5 KiB
Python
94 lines
4.5 KiB
Python
import os
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException
|
|
from app_log import LoggingManager
|
|
from models import Product
|
|
|
|
|
|
|
|
class CostcoMonitor:
|
|
def __init__(self, url):
|
|
self.url = url
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless") # Remove this line if you want to see the browser
|
|
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3")
|
|
chrome_options.add_argument("--window-size=1920,1080")
|
|
chrome_options.add_argument("--log-level=3")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
if os.name == "nt":
|
|
chrome_options.add_argument("--disable-gpu")
|
|
self.driver = webdriver.Chrome(options=chrome_options)
|
|
self.log_manager = LoggingManager("scraper.log")
|
|
|
|
def wait_for_page_load(self):
|
|
try:
|
|
WebDriverWait(self.driver, 20).until(
|
|
lambda driver: driver.execute_script("return document.readyState") == "complete"
|
|
)
|
|
except TimeoutException:
|
|
self.log_manager.error("Timed out waiting for page to load")
|
|
|
|
def get_products(self, retries=0) -> list[Product]:
|
|
self.log_manager.info(f"Loading Costco page: {self.url}")
|
|
self.driver.get(self.url)
|
|
self.wait_for_page_load() # Wait for the page to fully load
|
|
|
|
# Wait for the product list to be visible on the page
|
|
|
|
print("Waiting for product")
|
|
try:
|
|
WebDriverWait(self.driver, 20).until(
|
|
EC.visibility_of_element_located((By.XPATH, "//div[@automation-id='productList']"))
|
|
)
|
|
except TimeoutException:
|
|
self.log_manager.error("Timed out waiting for product list to load")
|
|
if retries < 3:
|
|
self.log_manager.info("Retrying...")
|
|
self.get_products(retries + 1)
|
|
else:
|
|
self.log_manager.error("Failed to load product list after 3 retries")
|
|
return []
|
|
products = self.driver.find_elements(By.XPATH, "//div[@automation-id='productList']/div[contains(@class, 'product')]")
|
|
self.log_manager.info(f"Found {len(products)} products on the page")
|
|
|
|
product_detail_list = []
|
|
|
|
for product in products:
|
|
try:
|
|
product_sku = product.find_element(By.CSS_SELECTOR, "input[id^='product_sku_']").get_attribute('value')
|
|
product_name = product.find_element(By.CSS_SELECTOR, "input[id^='product_name_']").get_attribute('value')
|
|
price_element = product.find_element(By.CSS_SELECTOR, "div[class*='price']")
|
|
price = price_element.text if price_element else "Price not found"
|
|
img_element = product.find_element(By.CSS_SELECTOR, "a.product-image-url img.img-responsive")
|
|
img_url = img_element.get_attribute('src') if img_element else "Image URL not found"
|
|
product_link_element = product.find_element(By.CSS_SELECTOR, "a.product-image-url")
|
|
product_link = product_link_element.get_attribute('href') if product_link_element else "Product link not found"
|
|
# check if any are none, set to blank
|
|
product_sku = product_sku if product_sku else ""
|
|
product_name = product_name if product_name else ""
|
|
price = price if price else ""
|
|
img_url = img_url if img_url else ""
|
|
product_link = product_link if product_link else ""
|
|
product_detail_list.append(Product(product_sku, product_name, price, img_url, product_link))
|
|
self.log_manager.log(f"SKU: {product_sku}, Name: {product_name}, Price: {price}, Image URL: {img_url}, Product Link: {product_link}")
|
|
|
|
except Exception as e:
|
|
self.log_manager.error(f"Error processing product: {e}")
|
|
|
|
return product_detail_list
|
|
|
|
def close(self):
|
|
self.driver.quit()
|
|
self.log_manager.info("Browser closed")
|
|
|
|
if __name__ == "__main__":
|
|
url = "https://www.costco.com/CatalogSearch?dept=All&keyword=bagels"
|
|
monitor = CostcoMonitor(url)
|
|
monitor.get_products()
|
|
monitor.close()
|