Created
February 29, 2024 15:49
-
-
Save triposat/63c21f3d1383e1ad4d232d2febbbdc79 to your computer and use it in GitHub Desktop.
Python Selenium Series Part-3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from webdriver_manager.chrome import ChromeDriverManager | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from dataclasses import dataclass, field, fields, InitVar, asdict | |
import csv | |
import json | |
import time | |
import os | |
import boto3 | |
import psycopg2 | |
import mysql.connector | |
@dataclass | |
class Product: | |
name: str = "" | |
price_string: InitVar[str] = "" | |
price_gb: float = field(init=False) | |
price_usd: float = field(init=False) | |
url: str = "" | |
def __post_init__(self, price_string): | |
self.name = self.clean_name() | |
self.price_gb = self.clean_price(price_string) | |
self.price_usd = self.convert_price_to_usd() | |
self.url = self.create_absolute_url() | |
def clean_name(self): | |
if self.name == "": | |
return "missing" | |
return self.name.strip() | |
def clean_price(self, price_string): | |
price_string = price_string.strip() | |
price_string = price_string.replace("Sale price\n£", "") | |
price_string = price_string.replace("Sale price\nFrom £", "") | |
if price_string == "": | |
return 0.0 | |
return float(price_string) | |
def convert_price_to_usd(self): | |
return round(self.price_gb * 1.21, 2) | |
def create_absolute_url(self): | |
if self.url == "": | |
return "missing" | |
return self.url | |
class ProductDataPipeline: | |
def __init__( | |
self, | |
csv_filename="", | |
json_filename="", | |
mysql_db_name="", | |
postgre_db_name="", | |
aws_s3_bucket="", | |
storage_queue_limit=5, | |
): | |
self.names_seen = [] | |
self.storage_queue = [] | |
self.storage_queue_limit = storage_queue_limit | |
self.csv_filename = csv_filename | |
self.json_filename = json_filename | |
self.mysql_db_name = mysql_db_name | |
self.postgre_db_name = postgre_db_name | |
self.aws_s3_bucket = aws_s3_bucket | |
self.data_stored = False # Flag to track if data is stored | |
def save_to_csv(self): | |
products_to_save = [] | |
products_to_save.extend(self.storage_queue) | |
self.storage_queue.clear() | |
if not products_to_save: | |
return | |
keys = [field.name for field in fields(products_to_save[0])] | |
file_exists = ( | |
os.path.isfile(self.csv_filename) and os.path.getsize( | |
self.csv_filename) > 0 | |
) | |
with open( | |
self.csv_filename, mode="a", newline="", encoding="utf-8" | |
) as output_file: | |
writer = csv.DictWriter(output_file, fieldnames=keys) | |
if not file_exists: | |
writer.writeheader() | |
for product in products_to_save: | |
writer.writerow(asdict(product)) | |
def save_to_json(self): | |
products_to_save = [] | |
json_data = [] | |
products_to_save.extend(self.storage_queue) | |
if not products_to_save: | |
return | |
for product in products_to_save: | |
json_data.append(asdict(product)) | |
try: | |
with open(self.json_filename, mode="r", encoding="utf-8") as output_file: | |
existing_data = json.load(output_file) | |
except FileNotFoundError: | |
existing_data = [] | |
existing_data.extend(json_data) | |
with open(self.json_filename, mode="w", encoding="utf-8") as output_file: | |
json.dump(existing_data, output_file, indent=2) | |
def save_to_mysql(self): | |
products_to_save = [] | |
products_to_save.extend(self.storage_queue) | |
if not products_to_save: | |
return | |
self.connection = mysql.connector.connect( | |
host="localhost", | |
user="root", | |
port="3306", | |
password="mypass@654", | |
database=self.mysql_db_name, | |
) | |
# Create cursor object | |
with self.connection.cursor() as cursor: | |
for product in products_to_save: | |
item = asdict(product) | |
cursor.execute( | |
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""", | |
(item["name"], item["price_gb"], | |
item["price_usd"], item["url"]), | |
) | |
self.connection.commit() | |
def save_to_postgresql(self): | |
products_to_save = [] | |
products_to_save.extend(self.storage_queue) | |
if not products_to_save: | |
return | |
self.connection = psycopg2.connect( | |
host="localhost", | |
database=self.postgre_db_name, | |
user="postgres", | |
password="mypass@654", | |
) | |
with self.connection.cursor() as cursor: | |
for product in products_to_save: | |
item = asdict(product) | |
cursor.execute( | |
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""", | |
(item["name"], item["price_gb"], | |
item["price_usd"], item["url"]), | |
) | |
self.connection.commit() | |
def save_to_s3_bucket(self): | |
aws_access_key_id = "YOUR_ACCESS_KEY" | |
aws_secret_access_key = "YOUR_SECRET_KEY" | |
s3_client = boto3.client( | |
"s3", | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
) | |
response = s3_client.put_object( | |
Bucket=self.aws_s3_bucket, | |
Key="chocolate_data.csv", | |
Body=open("product_data.csv", "rb"), | |
) | |
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") | |
if status == 200: | |
print("Successfully uploaded data to S3 bucket!") | |
else: | |
print(f"Failed to upload data to S3 bucket. Status code: {status}") | |
def clean_raw_product(self, scraped_data): | |
return Product( | |
name=scraped_data.get("name", ""), | |
price_string=scraped_data.get("price", ""), | |
url=scraped_data.get("url", ""), | |
) | |
def is_duplicate(self, product_data): | |
if product_data.name in self.names_seen: | |
print(f"Duplicate item found: {product_data.name}. Item dropped.") | |
return True | |
self.names_seen.append(product_data.name) | |
return False | |
def add_product(self, scraped_data): | |
product = self.clean_raw_product(scraped_data) | |
if self.is_duplicate(product) == False: | |
self.storage_queue.append(product) | |
if len(self.storage_queue) >= self.storage_queue_limit: | |
self.save_to_json() | |
self.save_to_mysql() | |
self.save_to_postgresql() | |
self.save_to_csv() | |
self.data_stored = True # Set flag to True when data is stored | |
def close_pipeline(self): | |
if len(self.storage_queue) > 0: | |
self.save_to_json() | |
self.save_to_mysql() | |
self.save_to_postgresql() | |
self.save_to_csv() | |
if self.data_stored: # Check if data is stored before printing | |
print("Data pipeline closed. Saved data to files and databases.") | |
else: | |
print("No data to save. Closing data pipeline.") | |
else: | |
print("No data to save. Closing data pipeline.") | |
list_of_urls = [ | |
"https://www.chocolate.co.uk/collections/all", | |
] | |
def start_scrape(): | |
for url in list_of_urls: | |
driver.get(url) | |
products = driver.find_elements(By.CLASS_NAME, "product-item") | |
for product in products: | |
name = product.find_element( | |
By.CLASS_NAME, "product-item-meta__title").text | |
price = product.find_element( | |
By.CLASS_NAME, "price").text | |
url = product.find_element( | |
By.CLASS_NAME, "product-item-meta__title" | |
).get_attribute("href") | |
data_pipeline.add_product( | |
{"name": name, "price": price, "url": url}) | |
try: | |
next_page = driver.find_element(By.CSS_SELECTOR, "a[rel='next']") | |
if next_page: | |
list_of_urls.append(next_page.get_attribute("href")) | |
print("Scraped page", len(list_of_urls), "...") | |
time.sleep(1) | |
except: | |
print("No more pages found!") | |
if __name__ == "__main__": | |
options = Options() | |
options.add_argument("--headless") # Enables headless mode | |
# Using ChromedriverManager to automatically download and install Chromedriver | |
driver = webdriver.Chrome( | |
options=options, service=Service(ChromeDriverManager().install()) | |
) | |
data_pipeline = ProductDataPipeline( | |
csv_filename="product_data.csv", | |
json_filename="product_data.json", | |
mysql_db_name="chocolatedb", | |
postgre_db_name="chocolatedb", | |
aws_s3_bucket="chocolate-products", | |
) | |
start_scrape() | |
data_pipeline.close_pipeline() | |
print("Congratulations! Data saved successfully on MySQL, PostgreSQL, and JSON.") | |
time.sleep(3) | |
data_pipeline.save_to_s3_bucket() | |
driver.quit() # Close the browser window after finishing | |
print("Scraping completed successfully. Browser closed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment