This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests #3rd party module | |
from bs4 import BeautifulSoup | |
from datetime import datetime | |
from http.cookiejar import MozillaCookieJar | |
from amazoncaptcha import AmazonCaptcha | |
import traceback | |
import time | |
import os | |
import json | |
session = requests.Session() | |
session.cookies = MozillaCookieJar('cookies.txt') | |
userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" | |
OUTBOX_DIR = "../../outbox" | |
SCRIPT_TITLE = "amazon-price-checker" | |
# takes (title, price, url, max) tuple as all strings | |
def notify(resp): | |
if not resp: | |
return | |
# create a file with our message contents | |
output = {} | |
output["source"] = SCRIPT_TITLE | |
if type(resp) == str: | |
# just pass the string through | |
output["content"] = resp | |
print(resp) | |
elif type(resp) == tuple and len(resp) == 4: | |
name, price, url, maxPrice = resp | |
price = float(price) | |
if price > maxPrice: | |
# too high... skip | |
return | |
output["content"] = f"Amazon price alert! ${price} {name} - {url}" | |
tstamp = time.strftime("%Y-%m-%d-%H-%M-%S") | |
with open(f"{OUTBOX_DIR}{os.sep}{SCRIPT_TITLE}-{tstamp}.json", "w") as file: | |
json.dump(output, file, indent=4) | |
# or other notification method | |
def checkPrice(productURL, maxPrice = 9999): | |
headers = { "User-Agent": userAgent } | |
htmlPage = session.get(productURL, headers = headers) | |
soup1 = BeautifulSoup(htmlPage.content,'html.parser') | |
try: | |
captcha = soup1.find('div', class_='a-row a-text-center').find('img')['src'] | |
if captcha: | |
# amazon doesn't want us to be a robot, so we will now prove our humanity | |
submitURL = f"https://www.amazon.com{soup1.find('form')['action']}" | |
codeParams = [('field-keywords', AmazonCaptcha.fromlink(captcha).solve())] | |
amznParams = list(map(lambda x: (x['name'], x['value']), soup1.find_all('input', type='hidden'))) | |
payload = dict(codeParams + amznParams) | |
htmlPage = session.get(submitURL, params=payload, headers=headers) | |
soup1 = BeautifulSoup(htmlPage.content,'html.parser') | |
except: | |
# no captcha? captcha detection failed? | |
# traceback.print_exc() | |
print("Skipping captcha solving") | |
# second soup because amazon uses javascript to create the price | |
soup2 = BeautifulSoup(soup1.prettify(),'html.parser') | |
# get product title | |
productTitle = soup2.find('span', id='productTitle').getText() | |
print(productTitle.strip()) | |
# get product price | |
price = soup2.find('span', id='newBuyBoxPrice') | |
price = price or soup2.find('span', id='price_inside_buybox') | |
price = price or soup2.find('span', id='priceblock_ourprice') | |
price = price or soup2.find('span', id='priceblock_saleprice') | |
price = price or soup2.find('span', id='priceBlockStrikePriceString') | |
session.cookies.save() | |
try: | |
productPriceStr = price.getText() | |
print(productPriceStr.strip()) | |
return (productTitle.strip(), productPriceStr.strip().lstrip("$"), productURL, maxPrice) | |
except: | |
# no price? | |
print("No price found / unavailable") | |
pass | |
return None | |
while True: | |
try: | |
notify(checkPrice("https://www.amazon.com/gp/product/B08166SLDF", 311.99)) | |
notify(checkPrice("https://www.amazon.com/gp/product/B07M6VMTRQ", 501.99)) | |
except Exception as e: | |
notify(f"Hit an issue with one of the amazon price pages: {e}") | |
traceback.print_exc() | |
time.sleep(30) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment