Skip to content

Instantly share code, notes, and snippets.

@triposat
Created December 23, 2023 07:25
Show Gist options
  • Save triposat/1d22724de6f227642c8faa6080f00520 to your computer and use it in GitHub Desktop.
Save triposat/1d22724de6f227642c8faa6080f00520 to your computer and use it in GitHub Desktop.
Python Beginner Series Part 2
import os
import time
import csv
import requests
from bs4 import BeautifulSoup
from dataclasses import dataclass, field, fields, InitVar, asdict
@dataclass
class Product:
name: str = ""
price_string: InitVar[str] = ""
price_gb: float = field(init=False)
price_usd: float = field(init=False)
url: str = ""
def __post_init__(self, price_string):
self.name = self.clean_name()
self.price_gb = self.clean_price(price_string)
self.price_usd = self.convert_price_to_usd()
self.url = self.create_absolute_url()
def clean_name(self):
if self.name == "":
return "missing"
return self.name.strip()
def clean_price(self, price_string):
price_string = price_string.strip()
price_string = price_string.replace("Sale price£", "")
price_string = price_string.replace("Sale priceFrom £", "")
if price_string == "":
return 0.0
return float(price_string)
def convert_price_to_usd(self):
return self.price_gb * 1.21
def create_absolute_url(self):
if self.url == "":
return "missing"
return "https://www.chocolate.co.uk" + self.url
class ProductDataPipeline:
def __init__(self, csv_filename="", storage_queue_limit=5):
self.names_seen = []
self.storage_queue = []
self.storage_queue_limit = storage_queue_limit
self.csv_filename = csv_filename
self.csv_file_open = False
def save_to_csv(self):
self.csv_file_open = True
products_to_save = []
products_to_save.extend(self.storage_queue)
self.storage_queue.clear()
if not products_to_save:
return
keys = [field.name for field in fields(products_to_save[0])]
file_exists = (
os.path.isfile(self.csv_filename) and os.path.getsize(self.csv_filename) > 0
)
with open(
self.csv_filename, mode="a", newline="", encoding="utf-8"
) as output_file:
writer = csv.DictWriter(output_file, fieldnames=keys)
if not file_exists:
writer.writeheader()
for product in products_to_save:
writer.writerow(asdict(product))
self.csv_file_open = False
def clean_raw_product(self, scraped_data):
return Product(
name=scraped_data.get("name", ""),
price_string=scraped_data.get("price", ""),
url=scraped_data.get("url", ""),
)
def is_duplicate(self, product_data):
if product_data.name in self.names_seen:
print(f"Duplicate item found: {product_data.name}. Item dropped.")
return True
self.names_seen.append(product_data.name)
return False
def add_product(self, scraped_data):
product = self.clean_raw_product(scraped_data)
if self.is_duplicate(product) == False:
self.storage_queue.append(product)
if (
len(self.storage_queue) >= self.storage_queue_limit
and self.csv_file_open == False
):
self.save_to_csv()
def close_pipeline(self):
if self.csv_file_open:
time.sleep(3)
if len(self.storage_queue) > 0:
self.save_to_csv()
list_of_urls = [
"https://www.chocolate.co.uk/collections/all",
]
# Scraping Function
def start_scrape():
# Loop Through List of URLs
for url in list_of_urls:
# Send Request
response = requests.get(url)
if response.status_code == 200:
# Parse Data
soup = BeautifulSoup(response.content, "html.parser")
products = soup.select("product-item")
for product in products:
name = product.select("a.product-item-meta__title")[0].get_text()
price = (
product.select("span.price")[0]
.get_text()
.replace("\nSale price£", "")
)
url = product.select("div.product-item-meta a")[0]["href"]
# Add To Data Pipeline
data_pipeline.add_product({"name": name, "price": price, "url": url})
# Next Page
next_page = soup.select('a[rel="next"]')
if len(next_page) > 0:
list_of_urls.append(
"https://www.chocolate.co.uk" + next_page[0]["href"]
)
if __name__ == "__main__":
data_pipeline = ProductDataPipeline(csv_filename="product_data.csv")
start_scrape()
data_pipeline.close_pipeline()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment