Skip to content

Instantly share code, notes, and snippets.

@triposat
Created February 29, 2024 15:49
Show Gist options
  • Save triposat/63c21f3d1383e1ad4d232d2febbbdc79 to your computer and use it in GitHub Desktop.
Save triposat/63c21f3d1383e1ad4d232d2febbbdc79 to your computer and use it in GitHub Desktop.
Python Selenium Series Part-3
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from dataclasses import dataclass, field, fields, InitVar, asdict
import csv
import json
import time
import os
import boto3
import psycopg2
import mysql.connector
@dataclass
class Product:
name: str = ""
price_string: InitVar[str] = ""
price_gb: float = field(init=False)
price_usd: float = field(init=False)
url: str = ""
def __post_init__(self, price_string):
self.name = self.clean_name()
self.price_gb = self.clean_price(price_string)
self.price_usd = self.convert_price_to_usd()
self.url = self.create_absolute_url()
def clean_name(self):
if self.name == "":
return "missing"
return self.name.strip()
def clean_price(self, price_string):
price_string = price_string.strip()
price_string = price_string.replace("Sale price\n£", "")
price_string = price_string.replace("Sale price\nFrom £", "")
if price_string == "":
return 0.0
return float(price_string)
def convert_price_to_usd(self):
return round(self.price_gb * 1.21, 2)
def create_absolute_url(self):
if self.url == "":
return "missing"
return self.url
class ProductDataPipeline:
def __init__(
self,
csv_filename="",
json_filename="",
mysql_db_name="",
postgre_db_name="",
aws_s3_bucket="",
storage_queue_limit=5,
):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.json_filename = json_filename
self.mysql_db_name = mysql_db_name
self.postgre_db_name = postgre_db_name
self.aws_s3_bucket = aws_s3_bucket
self.data_stored = False # Flag to track if data is stored
def save_to_csv(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not products_to_save:
return
keys = [field.name for field in fields(products_to_save[0])]
file_exists = (
os.path.isfile(self.csv_filename) and os.path.getsize(
self.csv_filename) > 0
)
with open(
self.csv_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for product in products_to_save:
writer.writerow(asdict(product))
def save_to_json(self):
products_to_save = []
json_data = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
for product in products_to_save:
json_data.append(asdict(product))
try:
with open(self.json_filename, mode="r", encoding="utf-8") as output_file:
existing_data = json.load(output_file)
except FileNotFoundError:
existing_data = []
existing_data.extend(json_data)
with open(self.json_filename, mode="w", encoding="utf-8") as output_file:
json.dump(existing_data, output_file, indent=2)
def save_to_mysql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
self.connection = mysql.connector.connect(
host="localhost",
user="root",
port="3306",
password="mypass@654",
database=self.mysql_db_name,
)
# Create cursor object
with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"],
item["price_usd"], item["url"]),
)
self.connection.commit()
def save_to_postgresql(self):
products_to_save = []
products_to_save.extend(self.storage_queue)
if not products_to_save:
return
self.connection = psycopg2.connect(
host="localhost",
database=self.postgre_db_name,
user="postgres",
password="mypass@654",
)
with self.connection.cursor() as cursor:
for product in products_to_save:
item = asdict(product)
cursor.execute(
""" insert into chocolate_products ( name, price_gb, price_usd, url) values (%s,%s,%s,%s)""",
(item["name"], item["price_gb"],
item["price_usd"], item["url"]),
)
self.connection.commit()
def save_to_s3_bucket(self):
aws_access_key_id = "YOUR_ACCESS_KEY"
aws_secret_access_key = "YOUR_SECRET_KEY"
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
response = s3_client.put_object(
Bucket=self.aws_s3_bucket,
Key="chocolate_data.csv",
Body=open("product_data.csv", "rb"),
)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status == 200:
print("Successfully uploaded data to S3 bucket!")
else:
print(f"Failed to upload data to S3 bucket. Status code: {status}")
def clean_raw_product(self, scraped_data):
return Product(
name=scraped_data.get("name", ""),
price_string=scraped_data.get("price", ""),
url=scraped_data.get("url", ""),
)
def is_duplicate(self, product_data):
if product_data.name in self.names_seen:
print(f"Duplicate item found: {product_data.name}. Item dropped.")
return True
self.names_seen.append(product_data.name)
return False
def add_product(self, scraped_data):
product = self.clean_raw_product(scraped_data)
if self.is_duplicate(product) == False:
self.storage_queue.append(product)
if len(self.storage_queue) >= self.storage_queue_limit:
self.save_to_json()
self.save_to_mysql()
self.save_to_postgresql()
self.save_to_csv()
self.data_stored = True # Set flag to True when data is stored
def close_pipeline(self):
if len(self.storage_queue) > 0:
self.save_to_json()
self.save_to_mysql()
self.save_to_postgresql()
self.save_to_csv()
if self.data_stored: # Check if data is stored before printing
print("Data pipeline closed. Saved data to files and databases.")
else:
print("No data to save. Closing data pipeline.")
else:
print("No data to save. Closing data pipeline.")
list_of_urls = [
"https://www.chocolate.co.uk/collections/all",
]
def start_scrape():
for url in list_of_urls:
driver.get(url)
products = driver.find_elements(By.CLASS_NAME, "product-item")
for product in products:
name = product.find_element(
By.CLASS_NAME, "product-item-meta__title").text
price = product.find_element(
By.CLASS_NAME, "price").text
url = product.find_element(
By.CLASS_NAME, "product-item-meta__title"
).get_attribute("href")
data_pipeline.add_product(
{"name": name, "price": price, "url": url})
try:
next_page = driver.find_element(By.CSS_SELECTOR, "a[rel='next']")
if next_page:
list_of_urls.append(next_page.get_attribute("href"))
print("Scraped page", len(list_of_urls), "...")
time.sleep(1)
except:
print("No more pages found!")
if __name__ == "__main__":
options = Options()
options.add_argument("--headless") # Enables headless mode
# Using ChromedriverManager to automatically download and install Chromedriver
driver = webdriver.Chrome(
options=options, service=Service(ChromeDriverManager().install())
)
data_pipeline = ProductDataPipeline(
csv_filename="product_data.csv",
json_filename="product_data.json",
mysql_db_name="chocolatedb",
postgre_db_name="chocolatedb",
aws_s3_bucket="chocolate-products",
)
start_scrape()
data_pipeline.close_pipeline()
print("Congratulations! Data saved successfully on MySQL, PostgreSQL, and JSON.")
time.sleep(3)
data_pipeline.save_to_s3_bucket()
driver.quit() # Close the browser window after finishing
print("Scraping completed successfully. Browser closed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment