Created
February 29, 2024 04:09
-
-
Save triposat/22c386ef37d82e0a32f71dba2aeac4e8 to your computer and use it in GitHub Desktop.
Python Selenium Series Part-2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from webdriver_manager.chrome import ChromeDriverManager | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from dataclasses import dataclass, field, fields, InitVar, asdict | |
import csv | |
import time | |
import os | |
@dataclass | |
class Product: | |
name: str = "" | |
price_string: InitVar[str] = "" | |
price_gb: float = field(init=False) | |
price_usd: float = field(init=False) | |
url: str = "" | |
def __post_init__(self, price_string): | |
self.name = self.clean_name() | |
self.price_gb = self.clean_price(price_string) | |
self.price_usd = self.convert_price_to_usd() | |
self.url = self.create_absolute_url() | |
def clean_name(self): | |
if self.name == "": | |
return "missing" | |
return self.name.strip() | |
def clean_price(self, price_string): | |
price_string = price_string.strip() | |
price_string = price_string.replace("Sale price\n£", "") | |
price_string = price_string.replace("Sale price\nFrom £", "") | |
if price_string == "": | |
return 0.0 | |
return float(price_string) | |
def convert_price_to_usd(self): | |
return self.price_gb * 1.21 | |
def create_absolute_url(self): | |
if self.url == "": | |
return "missing" | |
return self.url | |
class ProductDataPipeline: | |
def __init__(self, csv_filename="", storage_queue_limit=5): | |
self.names_seen = [] | |
self.storage_queue = [] | |
self.storage_queue_limit = storage_queue_limit | |
self.csv_filename = csv_filename | |
self.csv_file_open = False | |
def save_to_csv(self): | |
self.csv_file_open = True | |
products_to_save = [] | |
products_to_save.extend(self.storage_queue) | |
self.storage_queue.clear() | |
if not products_to_save: | |
return | |
keys = [field.name for field in fields(products_to_save[0])] | |
file_exists = ( | |
os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 | |
) | |
with open( | |
self.csv_filename, mode="a", newline="", encoding="utf-8" | |
) as output_file: | |
writer = csv.DictWriter(output_file, fieldnames=keys) | |
if not file_exists: | |
writer.writeheader() | |
for product in products_to_save: | |
writer.writerow(asdict(product)) | |
self.csv_file_open = False | |
def clean_raw_product(self, scraped_data): | |
return Product( | |
name=scraped_data.get("name", ""), | |
price_string=scraped_data.get("price", ""), | |
url=scraped_data.get("url", ""), | |
) | |
def is_duplicate(self, product_data): | |
if product_data.name in self.names_seen: | |
print(f"Duplicate item found: {product_data.name}. Item dropped.") | |
return True | |
self.names_seen.append(product_data.name) | |
return False | |
def add_product(self, scraped_data): | |
product = self.clean_raw_product(scraped_data) | |
if self.is_duplicate(product) == False: | |
self.storage_queue.append(product) | |
if ( | |
len(self.storage_queue) >= self.storage_queue_limit | |
and self.csv_file_open == False | |
): | |
self.save_to_csv() | |
def close_pipeline(self): | |
if self.csv_file_open: | |
time.sleep(3) | |
if len(self.storage_queue) > 0: | |
self.save_to_csv() | |
list_of_urls = [ | |
"https://www.chocolate.co.uk/collections/all", | |
] | |
def start_scrape(): | |
print("Scraping started...") | |
for url in list_of_urls: | |
driver.get(url) | |
products = driver.find_elements(By.CLASS_NAME, "product-item") | |
for product in products: | |
name = product.find_element(By.CLASS_NAME, "product-item-meta__title").text | |
price = product.find_element(By.CLASS_NAME, "price").text | |
url = product.find_element( | |
By.CLASS_NAME, "product-item-meta__title" | |
).get_attribute("href") | |
data_pipeline.add_product({"name": name, "price": price, "url": url}) | |
try: | |
next_page = driver.find_element(By.CSS_SELECTOR, "a[rel='next']") | |
if next_page: | |
list_of_urls.append(next_page.get_attribute("href")) | |
print("Scraped page", len(list_of_urls), "...") # Show progress | |
time.sleep(1) # Add a brief pause between page loads | |
except: | |
print("No more pages found!") | |
if __name__ == "__main__": | |
options = Options() | |
options.add_argument("--headless") # Enables headless mode | |
# Using ChromedriverManager to automatically download and install Chromedriver | |
driver = webdriver.Chrome( | |
options=options, service=Service(ChromeDriverManager().install()) | |
) | |
data_pipeline = ProductDataPipeline(csv_filename="product_data.csv") | |
start_scrape() | |
data_pipeline.close_pipeline() | |
print("Scraping completed successfully!") | |
driver.quit() # Close the browser window after finishing |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment