Last active
February 9, 2026 18:46
-
-
Save Sergy08/79d640c34f581d2ee551f0b7f809cea0 to your computer and use it in GitHub Desktop.
amazon price tracker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import time | |
| import random | |
| import re | |
| import schedule | |
| import smtplib | |
| import requests | |
| import pandas as pd | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime | |
| from decimal import Decimal, InvalidOperation, ROUND_HALF_UP | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| # ============================================================================= | |
| # Step 6: Automate the tracker (configuration that controls what gets tracked) | |
| # ============================================================================= | |
| DEBUG = False # set True only when you need to inspect responses | |
| USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", | |
| ] | |
| CSV_FILE = "price_history.csv" | |
| # Step 6: List the products you want to track | |
| # GoPro example included (ASIN: B0FCTZ1GLY) | |
| PRODUCTS = [ | |
| {"url": "https://www.amazon.com/dp/B0FCTZ1GLY", "target_price": Decimal("999.99")}, | |
| {"url": "https://www.amazon.com/dp/B0CL5KNB9M", "target_price": Decimal("999.99")}, | |
| ] | |
| # Step 6: How often the tracker runs | |
| CHECK_EVERY_HOURS = 6 | |
| # Step 3: Best-effort locale/currency preference | |
| COOKIES = { | |
| "i18n-prefs": "USD", | |
| "lc-main": "en_US", | |
| } | |
| # ============================================================================= | |
| # Step 3: Build the basic scraper (price parsing + HTML extraction) | |
| # ============================================================================= | |
| def format_money(value: Decimal | None) -> str: | |
| """Format Decimal money consistently for display.""" | |
| if value is None: | |
| return "N/A" | |
| return str(value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) | |
| def parse_price(text: str | None) -> Decimal | None: | |
| """ | |
| Step 3: Parse a price string into Decimal. | |
| The goal is to take something like "$1,299.99" and turn it into Decimal("1299.99"). | |
| """ | |
| if not text: | |
| return None | |
| # Pick the first number-like token found in the string | |
| matches = re.findall(r"\d[\d.,]*", text) | |
| if not matches: | |
| return None | |
| token = matches[0] | |
| # Detect separators based on which appears last | |
| last_dot = token.rfind(".") | |
| last_comma = token.rfind(",") | |
| if last_dot > last_comma: | |
| decimal_sep = "." | |
| thousands_sep = "," | |
| else: | |
| decimal_sep = "," | |
| thousands_sep = "." | |
| token = token.replace(thousands_sep, "") | |
| if decimal_sep == ",": | |
| token = token.replace(",", ".") | |
| try: | |
| return Decimal(token) | |
| except (InvalidOperation, ValueError): | |
| return None | |
| def extract_price(soup: BeautifulSoup) -> Decimal | None: | |
| """ | |
| Step 3: Extract the main product price from the page. | |
| Amazon markup changes, so we try the main price area first, then fall back. | |
| """ | |
| selectors = [ | |
| "#corePriceDisplay_desktop_feature_div span.a-offscreen", # common modern layout | |
| "#corePrice_feature_div span.a-offscreen", | |
| "#apex_desktop span.a-offscreen", | |
| "#priceblock_ourprice, #priceblock_dealprice, #priceblock_saleprice", # older IDs | |
| "span.a-offscreen", # broad fallback, can be noisy on some pages | |
| ] | |
| for sel in selectors: | |
| el = soup.select_one(sel) | |
| if not el: | |
| continue | |
| raw = el.get_text(" ", strip=True) | |
| price = parse_price(raw) | |
| if price is not None: | |
| return price | |
| # Last resort: whole + fraction (can be incomplete depending on layout) | |
| whole = soup.select_one("span.a-price-whole") | |
| fraction = soup.select_one("span.a-price-fraction") | |
| if whole: | |
| whole_text = re.sub(r"[^\d,\.]", "", whole.get_text(" ", strip=True)) | |
| frac_text = re.sub(r"[^\d]", "", fraction.get_text(" ", strip=True)) if fraction else "00" | |
| combined = f"{whole_text}.{frac_text}" | |
| return parse_price(combined) | |
| return None | |
| # ============================================================================= | |
| # Step 7: Handle anti-bot measures (headers + backoff + retry) | |
| # ============================================================================= | |
| def get_random_headers(): | |
| """Rotate headers so requests look less repetitive.""" | |
| return { | |
| "User-Agent": random.choice(USER_AGENTS), | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Connection": "keep-alive", | |
| "Referer": "https://www.amazon.com/", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| def fetch_with_backoff(session: requests.Session, url: str, cookies=None, max_retries: int = 3): | |
| """ | |
| Step 7: Request helper with jitter + backoff. | |
| Returns response (status 200) or None. | |
| """ | |
| cookies = cookies or {} | |
| for attempt in range(1, max_retries + 1): | |
| # jitter before each request | |
| time.sleep(random.uniform(2, 7)) | |
| try: | |
| resp = session.get( | |
| url, | |
| headers=get_random_headers(), | |
| cookies=cookies, | |
| timeout=20, | |
| allow_redirects=True, | |
| ) | |
| if DEBUG: | |
| print("DEBUG status:", resp.status_code) | |
| print("DEBUG final_url:", resp.url) | |
| snippet = resp.text[:250].replace("\n", " ").replace("\r", " ") | |
| print("DEBUG first_250:", snippet) | |
| if resp.status_code == 200: | |
| return resp | |
| # 429 is clear throttling, 503 can also be interstitial/bot checks/temporary issues | |
| if resp.status_code in (429, 503): | |
| wait = 5 * attempt + random.uniform(0, 3) | |
| print( | |
| f"Got {resp.status_code} (temporary block or interstitial). " | |
| f"Waiting {wait:.1f}s, retry {attempt}/{max_retries}" | |
| ) | |
| time.sleep(wait) | |
| continue | |
| print(f"Got status code {resp.status_code}. Skipping.") | |
| return None | |
| except requests.RequestException as e: | |
| wait = 5 * attempt + random.uniform(0, 3) | |
| print(f"Request error: {e}. Waiting {wait:.1f}s, retry {attempt}/{max_retries}") | |
| time.sleep(wait) | |
| print("All retries failed") | |
| return None | |
| # ============================================================================= | |
| # Step 3: Build the basic scraper (fetch page, parse title + price) | |
| # ============================================================================= | |
| def get_product_data(url: str): | |
| """ | |
| Step 3: Fetch the product page and extract the data we care about. | |
| """ | |
| session = requests.Session() | |
| response = fetch_with_backoff(session, url, cookies=COOKIES, max_retries=3) | |
| if not response: | |
| return None | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| title_el = soup.find("span", {"id": "productTitle"}) | |
| title = title_el.get_text(strip=True) if title_el else "Unknown" | |
| price = extract_price(soup) | |
| # simple sanity check so you avoid writing obvious junk | |
| if price is not None and price > Decimal("100000"): | |
| return None | |
| return {"title": title, "price": price, "url": url} | |
| # ============================================================================= | |
| # Step 4: Store price history (CSV) | |
| # ============================================================================= | |
| def save_price(data: dict, filename: str = CSV_FILE): | |
| """ | |
| Step 4: Append a (timestamp, title, price, url) row into a CSV file. | |
| """ | |
| if not data or data.get("price") is None: | |
| print("No valid price to save.") | |
| return | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| entry = { | |
| "timestamp": timestamp, | |
| "title": data["title"], | |
| "price": str(data["price"]), # store as string to keep Decimal accuracy | |
| "url": data["url"], | |
| } | |
| if os.path.exists(filename): | |
| df = pd.read_csv(filename) | |
| df = pd.concat([df, pd.DataFrame([entry])], ignore_index=True) | |
| else: | |
| df = pd.DataFrame([entry]) | |
| df.to_csv(filename, index=False) | |
| print(f"[{timestamp}] {data['title'][:50]}: ${format_money(data['price'])}") | |
| # ============================================================================= | |
| # Step 5: Add price drop alerts (Gmail SMTP) | |
| # ============================================================================= | |
| def send_email_alert(product_data: dict, target_price: Decimal) -> bool: | |
| """ | |
| Step 5: Send an email if the current price is at or below the target. | |
| Requires env vars: EMAIL_SENDER, EMAIL_PASSWORD (App Password), EMAIL_RECIPIENT | |
| """ | |
| sender = os.environ.get("EMAIL_SENDER") | |
| password = os.environ.get("EMAIL_PASSWORD") | |
| recipient = os.environ.get("EMAIL_RECIPIENT") | |
| if not all([sender, password, recipient]): | |
| print("Email credentials not configured. Skipping alert.") | |
| return False | |
| current_price = product_data.get("price") | |
| if current_price is None: | |
| return False | |
| current_price = Decimal(str(current_price)) | |
| target_price = Decimal(str(target_price)) | |
| savings = target_price - current_price | |
| subject = f"Price Drop Alert: {product_data['title'][:50]}" | |
| body = f""" | |
| Good news! A product you're tracking dropped below your target price. | |
| Product: {product_data['title']} | |
| Current Price: ${format_money(current_price)} | |
| Target Price: ${format_money(target_price)} | |
| You Save: ${format_money(savings)} | |
| Buy now: {product_data['url']} | |
| """.strip() | |
| msg = MIMEMultipart() | |
| msg["From"] = sender | |
| msg["To"] = recipient | |
| msg["Subject"] = subject | |
| msg.attach(MIMEText(body, "plain")) | |
| try: | |
| with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: | |
| server.login(sender, password) | |
| server.sendmail(sender, recipient, msg.as_string()) | |
| print(f"Alert sent to {recipient}") | |
| return True | |
| except Exception as e: | |
| print(f"Failed to send email: {e}") | |
| return False | |
| def check_price_and_alert(url: str, target_price: Decimal) -> bool: | |
| """ | |
| Step 5: Glue function that fetches, saves, then alerts if needed. | |
| """ | |
| product_data = get_product_data(url) | |
| if not product_data or product_data.get("price") is None: | |
| print(f"Could not fetch price for {url}") | |
| return False | |
| save_price(product_data) | |
| current_price = Decimal(str(product_data["price"])) | |
| target_price = Decimal(str(target_price)) | |
| if current_price <= target_price: | |
| return send_email_alert(product_data, target_price) | |
| print(f"Current price ${format_money(current_price)} is above target ${format_money(target_price)}") | |
| return False | |
| # ============================================================================= | |
| # Step 6: Automate the tracker (run loop + schedule) | |
| # ============================================================================= | |
| def run_tracker(): | |
| """ | |
| Step 6: Run one full pass across PRODUCTS. | |
| """ | |
| print(f"\n{'=' * 50}") | |
| print(f"Price check started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"{'=' * 50}") | |
| for product in PRODUCTS: | |
| try: | |
| check_price_and_alert(product["url"], product["target_price"]) | |
| except Exception as e: | |
| print(f"Error checking {product['url']}: {e}") | |
| # extra delay between products | |
| time.sleep(random.uniform(3, 8)) | |
| print("Price check complete.\n") | |
| # ============================================================================= | |
| # Step 10: Put it all together (entry point) | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| print("Starting Amazon Price Tracker...") | |
| # Step 6: run immediately on start | |
| run_tracker() | |
| # Step 6: schedule future runs | |
| schedule.every(CHECK_EVERY_HOURS).hours.do(run_tracker) | |
| print("Tracker is running. Press Ctrl+C to stop.") | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(60) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment