Created
December 23, 2023 07:25
-
-
Save triposat/1d22724de6f227642c8faa6080f00520 to your computer and use it in GitHub Desktop.
Python Beginner Series Part 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import csv | |
import requests | |
from bs4 import BeautifulSoup | |
from dataclasses import dataclass, field, fields, InitVar, asdict | |
@dataclass | |
class Product: | |
name: str = "" | |
price_string: InitVar[str] = "" | |
price_gb: float = field(init=False) | |
price_usd: float = field(init=False) | |
url: str = "" | |
def __post_init__(self, price_string): | |
self.name = self.clean_name() | |
self.price_gb = self.clean_price(price_string) | |
self.price_usd = self.convert_price_to_usd() | |
self.url = self.create_absolute_url() | |
def clean_name(self): | |
if self.name == "": | |
return "missing" | |
return self.name.strip() | |
def clean_price(self, price_string): | |
price_string = price_string.strip() | |
price_string = price_string.replace("Sale price£", "") | |
price_string = price_string.replace("Sale priceFrom £", "") | |
if price_string == "": | |
return 0.0 | |
return float(price_string) | |
def convert_price_to_usd(self): | |
return self.price_gb * 1.21 | |
def create_absolute_url(self): | |
if self.url == "": | |
return "missing" | |
return "https://www.chocolate.co.uk" + self.url | |
class ProductDataPipeline: | |
def __init__(self, csv_filename="", storage_queue_limit=5): | |
self.names_seen = [] | |
self.storage_queue = [] | |
self.storage_queue_limit = storage_queue_limit | |
self.csv_filename = csv_filename | |
self.csv_file_open = False | |
def save_to_csv(self): | |
self.csv_file_open = True | |
products_to_save = [] | |
products_to_save.extend(self.storage_queue) | |
self.storage_queue.clear() | |
if not products_to_save: | |
return | |
keys = [field.name for field in fields(products_to_save[0])] | |
file_exists = ( | |
os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0 | |
) | |
with open( | |
self.csv_filename, mode="a", newline="", encoding="utf-8" | |
) as output_file: | |
writer = csv.DictWriter(output_file, fieldnames=keys) | |
if not file_exists: | |
writer.writeheader() | |
for product in products_to_save: | |
writer.writerow(asdict(product)) | |
self.csv_file_open = False | |
def clean_raw_product(self, scraped_data): | |
return Product( | |
name=scraped_data.get("name", ""), | |
price_string=scraped_data.get("price", ""), | |
url=scraped_data.get("url", ""), | |
) | |
def is_duplicate(self, product_data): | |
if product_data.name in self.names_seen: | |
print(f"Duplicate item found: {product_data.name}. Item dropped.") | |
return True | |
self.names_seen.append(product_data.name) | |
return False | |
def add_product(self, scraped_data): | |
product = self.clean_raw_product(scraped_data) | |
if self.is_duplicate(product) == False: | |
self.storage_queue.append(product) | |
if ( | |
len(self.storage_queue) >= self.storage_queue_limit | |
and self.csv_file_open == False | |
): | |
self.save_to_csv() | |
def close_pipeline(self): | |
if self.csv_file_open: | |
time.sleep(3) | |
if len(self.storage_queue) > 0: | |
self.save_to_csv() | |
list_of_urls = [ | |
"https://www.chocolate.co.uk/collections/all", | |
] | |
# Scraping Function | |
def start_scrape(): | |
# Loop Through List of URLs | |
for url in list_of_urls: | |
# Send Request | |
response = requests.get(url) | |
if response.status_code == 200: | |
# Parse Data | |
soup = BeautifulSoup(response.content, "html.parser") | |
products = soup.select("product-item") | |
for product in products: | |
name = product.select("a.product-item-meta__title")[0].get_text() | |
price = ( | |
product.select("span.price")[0] | |
.get_text() | |
.replace("\nSale price£", "") | |
) | |
url = product.select("div.product-item-meta a")[0]["href"] | |
# Add To Data Pipeline | |
data_pipeline.add_product({"name": name, "price": price, "url": url}) | |
# Next Page | |
next_page = soup.select('a[rel="next"]') | |
if len(next_page) > 0: | |
list_of_urls.append( | |
"https://www.chocolate.co.uk" + next_page[0]["href"] | |
) | |
if __name__ == "__main__": | |
data_pipeline = ProductDataPipeline(csv_filename="product_data.csv") | |
start_scrape() | |
data_pipeline.close_pipeline() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment