81 lines
3.8 KiB
Python
81 lines
3.8 KiB
Python
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
from app_log import LoggingManager
|
|
|
|
|
|
class CostcoMonitor:
|
|
def __init__(self, url):
|
|
self.url = url
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless") # Remove this line if you want to see the browser
|
|
chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3")
|
|
chrome_options.add_argument("--window-size=1920,1080")
|
|
chrome_options.add_argument("--log-level=3")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
self.driver = webdriver.Chrome(options=chrome_options)
|
|
self.log_manager = LoggingManager("scraper.log")
|
|
|
|
def wait_for_page_load(self):
|
|
try:
|
|
WebDriverWait(self.driver, 20).until(
|
|
lambda driver: driver.execute_script("return document.readyState") == "complete"
|
|
)
|
|
except TimeoutException:
|
|
self.log_manager.error("Timed out waiting for page to load")
|
|
|
|
def get_products(self):
|
|
self.log_manager.info(f"Loading Costco page: {self.url}")
|
|
self.driver.get(self.url)
|
|
self.wait_for_page_load() # Wait for the page to fully load
|
|
|
|
# Wait for the product list to be visible on the page
|
|
WebDriverWait(self.driver, 20).until(
|
|
EC.visibility_of_element_located((By.CSS_SELECTOR, "div.product-list.grid"))
|
|
)
|
|
|
|
products = self.driver.find_elements(By.CSS_SELECTOR, "div.col-xs-6.col-lg-4.col-xl-3.product")
|
|
self.log_manager.info(f"Found {len(products)} products on the page")
|
|
|
|
product_detail_list = []
|
|
|
|
for product in products:
|
|
try:
|
|
product_sku = product.find_element(By.CSS_SELECTOR, "input[id^='product_sku_']").get_attribute('value')
|
|
product_name = product.find_element(By.CSS_SELECTOR, "input[id^='product_name_']").get_attribute('value')
|
|
price_element = product.find_element(By.CSS_SELECTOR, "div[class*='price']")
|
|
price = price_element.text if price_element else "Price not found"
|
|
img_element = product.find_element(By.CSS_SELECTOR, "a.product-image-url img.img-responsive")
|
|
img_url = img_element.get_attribute('src') if img_element else "Image URL not found"
|
|
product_link_element = product.find_element(By.CSS_SELECTOR, "a.product-image-url")
|
|
product_link = product_link_element.get_attribute('href') if product_link_element else "Product link not found"
|
|
product_detail_list.append({
|
|
"sku": product_sku,
|
|
"name": product_name,
|
|
"price": price,
|
|
"img_url": img_url,
|
|
"product_link": product_link
|
|
})
|
|
self.log_manager.log(f"SKU: {product_sku}, Name: {product_name}, Price: {price}, Image URL: {img_url}, Product Link: {product_link}")
|
|
|
|
except Exception as e:
|
|
self.log_manager.error(f"Error processing product: {e}")
|
|
|
|
return product_detail_list
|
|
|
|
def close(self):
|
|
self.driver.quit()
|
|
self.log_manager.info("Browser closed")
|
|
|
|
if __name__ == "__main__":
|
|
url = "https://www.costco.com/CatalogSearch?dept=All&keyword=pokemon"
|
|
monitor = CostcoMonitor(url)
|
|
monitor.get_products()
|
|
monitor.close()
|