Skip to content

Instantly share code, notes, and snippets.

@triposat
Created February 29, 2024 04:09
Show Gist options
  • Save triposat/22c386ef37d82e0a32f71dba2aeac4e8 to your computer and use it in GitHub Desktop.
Save triposat/22c386ef37d82e0a32f71dba2aeac4e8 to your computer and use it in GitHub Desktop.
Python Selenium Series Part-2
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from dataclasses import dataclass, field, fields, InitVar, asdict
import csv
import time
import os
@dataclass
class Product:
name: str = ""
price_string: InitVar[str] = ""
price_gb: float = field(init=False)
price_usd: float = field(init=False)
url: str = ""
def __post_init__(self, price_string):
self.name = self.clean_name()
self.price_gb = self.clean_price(price_string)
self.price_usd = self.convert_price_to_usd()
self.url = self.create_absolute_url()
def clean_name(self):
if self.name == "":
return "missing"
return self.name.strip()
def clean_price(self, price_string):
price_string = price_string.strip()
price_string = price_string.replace("Sale price\n£", "")
price_string = price_string.replace("Sale price\nFrom £", "")
if price_string == "":
return 0.0
return float(price_string)
def convert_price_to_usd(self):
return self.price_gb * 1.21
def create_absolute_url(self):
if self.url == "":
return "missing"
return self.url
class ProductDataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=5):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
products_to_save = []
products_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not products_to_save:
return
keys = [field.name for field in fields(products_to_save[0])]
file_exists = (
os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
)
with open(
self.csv_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for product in products_to_save:
writer.writerow(asdict(product))
self.csv_file_open = False
def clean_raw_product(self, scraped_data):
return Product(
name=scraped_data.get("name", ""),
price_string=scraped_data.get("price", ""),
url=scraped_data.get("url", ""),
)
def is_duplicate(self, product_data):
if product_data.name in self.names_seen:
print(f"Duplicate item found: {product_data.name}. Item dropped.")
return True
self.names_seen.append(product_data.name)
return False
def add_product(self, scraped_data):
product = self.clean_raw_product(scraped_data)
if self.is_duplicate(product) == False:
self.storage_queue.append(product)
if (
len(self.storage_queue) >= self.storage_queue_limit
and self.csv_file_open == False
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
list_of_urls = [
"https://www.chocolate.co.uk/collections/all",
]
def start_scrape():
print("Scraping started...")
for url in list_of_urls:
driver.get(url)
products = driver.find_elements(By.CLASS_NAME, "product-item")
for product in products:
name = product.find_element(By.CLASS_NAME, "product-item-meta__title").text
price = product.find_element(By.CLASS_NAME, "price").text
url = product.find_element(
By.CLASS_NAME, "product-item-meta__title"
).get_attribute("href")
data_pipeline.add_product({"name": name, "price": price, "url": url})
try:
next_page = driver.find_element(By.CSS_SELECTOR, "a[rel='next']")
if next_page:
list_of_urls.append(next_page.get_attribute("href"))
print("Scraped page", len(list_of_urls), "...") # Show progress
time.sleep(1) # Add a brief pause between page loads
except:
print("No more pages found!")
if __name__ == "__main__":
options = Options()
options.add_argument("--headless") # Enables headless mode
# Using ChromedriverManager to automatically download and install Chromedriver
driver = webdriver.Chrome(
options=options, service=Service(ChromeDriverManager().install())
)
data_pipeline = ProductDataPipeline(csv_filename="product_data.csv")
start_scrape()
data_pipeline.close_pipeline()
print("Scraping completed successfully!")
driver.quit() # Close the browser window after finishing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment