Skip to content

Instantly share code, notes, and snippets.

@Sergy08
Last active February 9, 2026 18:46
Show Gist options
  • Select an option

  • Save Sergy08/79d640c34f581d2ee551f0b7f809cea0 to your computer and use it in GitHub Desktop.

Select an option

Save Sergy08/79d640c34f581d2ee551f0b7f809cea0 to your computer and use it in GitHub Desktop.
amazon price tracker
import os
import time
import random
import re
import schedule
import smtplib
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# =============================================================================
# Step 6: Automate the tracker (configuration that controls what gets tracked)
# =============================================================================
DEBUG = False # set True only when you need to inspect responses
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
]
CSV_FILE = "price_history.csv"
# Step 6: List the products you want to track
# GoPro example included (ASIN: B0FCTZ1GLY)
PRODUCTS = [
{"url": "https://www.amazon.com/dp/B0FCTZ1GLY", "target_price": Decimal("999.99")},
{"url": "https://www.amazon.com/dp/B0CL5KNB9M", "target_price": Decimal("999.99")},
]
# Step 6: How often the tracker runs
CHECK_EVERY_HOURS = 6
# Step 3: Best-effort locale/currency preference
COOKIES = {
"i18n-prefs": "USD",
"lc-main": "en_US",
}
# =============================================================================
# Step 3: Build the basic scraper (price parsing + HTML extraction)
# =============================================================================
def format_money(value: Decimal | None) -> str:
"""Format Decimal money consistently for display."""
if value is None:
return "N/A"
return str(value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP))
def parse_price(text: str | None) -> Decimal | None:
"""
Step 3: Parse a price string into Decimal.
The goal is to take something like "$1,299.99" and turn it into Decimal("1299.99").
"""
if not text:
return None
# Pick the first number-like token found in the string
matches = re.findall(r"\d[\d.,]*", text)
if not matches:
return None
token = matches[0]
# Detect separators based on which appears last
last_dot = token.rfind(".")
last_comma = token.rfind(",")
if last_dot > last_comma:
decimal_sep = "."
thousands_sep = ","
else:
decimal_sep = ","
thousands_sep = "."
token = token.replace(thousands_sep, "")
if decimal_sep == ",":
token = token.replace(",", ".")
try:
return Decimal(token)
except (InvalidOperation, ValueError):
return None
def extract_price(soup: BeautifulSoup) -> Decimal | None:
"""
Step 3: Extract the main product price from the page.
Amazon markup changes, so we try the main price area first, then fall back.
"""
selectors = [
"#corePriceDisplay_desktop_feature_div span.a-offscreen", # common modern layout
"#corePrice_feature_div span.a-offscreen",
"#apex_desktop span.a-offscreen",
"#priceblock_ourprice, #priceblock_dealprice, #priceblock_saleprice", # older IDs
"span.a-offscreen", # broad fallback, can be noisy on some pages
]
for sel in selectors:
el = soup.select_one(sel)
if not el:
continue
raw = el.get_text(" ", strip=True)
price = parse_price(raw)
if price is not None:
return price
# Last resort: whole + fraction (can be incomplete depending on layout)
whole = soup.select_one("span.a-price-whole")
fraction = soup.select_one("span.a-price-fraction")
if whole:
whole_text = re.sub(r"[^\d,\.]", "", whole.get_text(" ", strip=True))
frac_text = re.sub(r"[^\d]", "", fraction.get_text(" ", strip=True)) if fraction else "00"
combined = f"{whole_text}.{frac_text}"
return parse_price(combined)
return None
# =============================================================================
# Step 7: Handle anti-bot measures (headers + backoff + retry)
# =============================================================================
def get_random_headers():
"""Rotate headers so requests look less repetitive."""
return {
"User-Agent": random.choice(USER_AGENTS),
"Accept-Language": "en-US,en;q=0.9",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Connection": "keep-alive",
"Referer": "https://www.amazon.com/",
"Upgrade-Insecure-Requests": "1",
}
def fetch_with_backoff(session: requests.Session, url: str, cookies=None, max_retries: int = 3):
"""
Step 7: Request helper with jitter + backoff.
Returns response (status 200) or None.
"""
cookies = cookies or {}
for attempt in range(1, max_retries + 1):
# jitter before each request
time.sleep(random.uniform(2, 7))
try:
resp = session.get(
url,
headers=get_random_headers(),
cookies=cookies,
timeout=20,
allow_redirects=True,
)
if DEBUG:
print("DEBUG status:", resp.status_code)
print("DEBUG final_url:", resp.url)
snippet = resp.text[:250].replace("\n", " ").replace("\r", " ")
print("DEBUG first_250:", snippet)
if resp.status_code == 200:
return resp
# 429 is clear throttling, 503 can also be interstitial/bot checks/temporary issues
if resp.status_code in (429, 503):
wait = 5 * attempt + random.uniform(0, 3)
print(
f"Got {resp.status_code} (temporary block or interstitial). "
f"Waiting {wait:.1f}s, retry {attempt}/{max_retries}"
)
time.sleep(wait)
continue
print(f"Got status code {resp.status_code}. Skipping.")
return None
except requests.RequestException as e:
wait = 5 * attempt + random.uniform(0, 3)
print(f"Request error: {e}. Waiting {wait:.1f}s, retry {attempt}/{max_retries}")
time.sleep(wait)
print("All retries failed")
return None
# =============================================================================
# Step 3: Build the basic scraper (fetch page, parse title + price)
# =============================================================================
def get_product_data(url: str):
"""
Step 3: Fetch the product page and extract the data we care about.
"""
session = requests.Session()
response = fetch_with_backoff(session, url, cookies=COOKIES, max_retries=3)
if not response:
return None
soup = BeautifulSoup(response.content, "html.parser")
title_el = soup.find("span", {"id": "productTitle"})
title = title_el.get_text(strip=True) if title_el else "Unknown"
price = extract_price(soup)
# simple sanity check so you avoid writing obvious junk
if price is not None and price > Decimal("100000"):
return None
return {"title": title, "price": price, "url": url}
# =============================================================================
# Step 4: Store price history (CSV)
# =============================================================================
def save_price(data: dict, filename: str = CSV_FILE):
"""
Step 4: Append a (timestamp, title, price, url) row into a CSV file.
"""
if not data or data.get("price") is None:
print("No valid price to save.")
return
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = {
"timestamp": timestamp,
"title": data["title"],
"price": str(data["price"]), # store as string to keep Decimal accuracy
"url": data["url"],
}
if os.path.exists(filename):
df = pd.read_csv(filename)
df = pd.concat([df, pd.DataFrame([entry])], ignore_index=True)
else:
df = pd.DataFrame([entry])
df.to_csv(filename, index=False)
print(f"[{timestamp}] {data['title'][:50]}: ${format_money(data['price'])}")
# =============================================================================
# Step 5: Add price drop alerts (Gmail SMTP)
# =============================================================================
def send_email_alert(product_data: dict, target_price: Decimal) -> bool:
"""
Step 5: Send an email if the current price is at or below the target.
Requires env vars: EMAIL_SENDER, EMAIL_PASSWORD (App Password), EMAIL_RECIPIENT
"""
sender = os.environ.get("EMAIL_SENDER")
password = os.environ.get("EMAIL_PASSWORD")
recipient = os.environ.get("EMAIL_RECIPIENT")
if not all([sender, password, recipient]):
print("Email credentials not configured. Skipping alert.")
return False
current_price = product_data.get("price")
if current_price is None:
return False
current_price = Decimal(str(current_price))
target_price = Decimal(str(target_price))
savings = target_price - current_price
subject = f"Price Drop Alert: {product_data['title'][:50]}"
body = f"""
Good news! A product you're tracking dropped below your target price.
Product: {product_data['title']}
Current Price: ${format_money(current_price)}
Target Price: ${format_money(target_price)}
You Save: ${format_money(savings)}
Buy now: {product_data['url']}
""".strip()
msg = MIMEMultipart()
msg["From"] = sender
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
try:
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender, password)
server.sendmail(sender, recipient, msg.as_string())
print(f"Alert sent to {recipient}")
return True
except Exception as e:
print(f"Failed to send email: {e}")
return False
def check_price_and_alert(url: str, target_price: Decimal) -> bool:
"""
Step 5: Glue function that fetches, saves, then alerts if needed.
"""
product_data = get_product_data(url)
if not product_data or product_data.get("price") is None:
print(f"Could not fetch price for {url}")
return False
save_price(product_data)
current_price = Decimal(str(product_data["price"]))
target_price = Decimal(str(target_price))
if current_price <= target_price:
return send_email_alert(product_data, target_price)
print(f"Current price ${format_money(current_price)} is above target ${format_money(target_price)}")
return False
# =============================================================================
# Step 6: Automate the tracker (run loop + schedule)
# =============================================================================
def run_tracker():
"""
Step 6: Run one full pass across PRODUCTS.
"""
print(f"\n{'=' * 50}")
print(f"Price check started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'=' * 50}")
for product in PRODUCTS:
try:
check_price_and_alert(product["url"], product["target_price"])
except Exception as e:
print(f"Error checking {product['url']}: {e}")
# extra delay between products
time.sleep(random.uniform(3, 8))
print("Price check complete.\n")
# =============================================================================
# Step 10: Put it all together (entry point)
# =============================================================================
if __name__ == "__main__":
print("Starting Amazon Price Tracker...")
# Step 6: run immediately on start
run_tracker()
# Step 6: schedule future runs
schedule.every(CHECK_EVERY_HOURS).hours.do(run_tracker)
print("Tracker is running. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment