Last active
September 15, 2021 09:16
Star
You must be signed in to star a gist
A small script for pulling current amazon bestseller rankings and dropping them into a json file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# A small script for scraping current amazon bestseller rankings to json. | |
import time | |
import json | |
import requests | |
from contextlib import suppress | |
from bs4 import BeautifulSoup | |
def build_url(domain='amazon.com', year=None, subcategory_n=None, page_n=None): | |
"""Builds a amazon bestseller url suitable for subsequent scraping.""" | |
if year: | |
url = f'https://www.{domain}/gp/bestsellers/{str(year)}/books' | |
else: | |
url = f'https://www.{domain}/gp/bestsellers/books' | |
if subcategory_n: url += '/' + str(subcategory_n) | |
if page_n: url += f'/?pg={str(page_n)}' | |
return url | |
def get_raw_page(url): | |
"""Returns the raw text of a url if requests call GETs it successfully.""" | |
r = requests.get(url) | |
print(f'{url} --- {r.status_code}') | |
if r.ok: | |
return r.text | |
def get_clean_ranked_list_from_scraped_page(scraped_page): | |
"""Processes a scraped page (ignoring all exceptions) to a ranked list.""" | |
soup = BeautifulSoup(scraped_page, 'lxml') | |
raw_items = soup.find_all('li', attrs={'class':'zg-item-immersion','role':'gridcell'}) | |
ranked_list = [] | |
for item in raw_items: | |
details = {'rank': None, | |
'title': None, | |
'author': None, | |
'title_link': None, | |
'book_type': None, | |
'raw_price': None, | |
'average_stars': None, | |
'n_reviews': None} | |
with suppress(Exception): | |
details['rank'] = int(item.find('span', attrs={'class':'zg-badge-text'}).text.replace('#','')) | |
details['title'] = item.find('a', attrs={'class':'a-link-normal'}).text.strip() | |
details['author'] = item.find('div', attrs={'class':'a-row a-size-small'}).find('a', attrs={'class':'a-size-small a-link-child'}).text | |
details['title_link'] = item.find('a', attrs={'class':'a-link-normal'}).attrs['href'] | |
details['book_type'] = item.find('span', attrs={'class':'a-size-small a-color-secondary'}).text | |
details['raw_price'] = item.find('span', attrs={'class':'p13n-sc-price'}).text | |
details['average_stars'] = item.find('div', attrs={'class':'a-icon-row a-spacing-none'}).find('a', attrs={'class':'a-link-normal'}).attrs['title'] | |
details['n_reviews'] = item.find('div', attrs={'class':'a-icon-row a-spacing-none'}).find('a', attrs={'class':'a-size-small a-link-normal'}).text | |
ranked_list.append(details) | |
return ranked_list | |
def scrape_rankings(domain='amazon.com', year=None, subcategory_n=None): | |
"""Conducts a multi-page scrape and returns dict of scrape results.""" | |
scrape_start_time = time.time() | |
top_hundred = [] | |
for page_n in [1,2]: | |
url = build_url(domain, year, subcategory_n, page_n) | |
raw = get_raw_page(url) | |
if raw: | |
ranked_list = get_clean_ranked_list_from_scraped_page(raw) | |
if isinstance(ranked_list, list): top_hundred += ranked_list | |
scrape_end_time = time.time() | |
results = {'scrape_start_time': scrape_start_time, | |
'scrape_end_time': scrape_end_time, | |
'subcategory_number': subcategory_n, | |
'rankings': top_hundred, | |
'full_listing_collected': len(top_hundred) == 100, | |
'base_domain': domain, | |
'filter_year': year, | |
'target_url': build_url(domain, year, subcategory_n)} | |
return results | |
def construct_filename_from_scrape_dict(scrape_dict): | |
"""Constructs a sensible filename using available details in a scrape.""" | |
start_time = scrape_dict['scrape_start_time'] | |
base_domain = scrape_dict['base_domain'] | |
fname = f'{str(int(start_time))}_{base_domain.replace(".","")}_bestseller_books_scrape' | |
if scrape_dict['filter_year']: fname += '_' + str(scrape_dict['filter_year']) | |
if scrape_dict['subcategory_number']: fname += '_subcat_' + str(scrape_dict['subcategory_number']) | |
return fname + '.json' | |
def main(domain='amazon.co.uk', year=None, subcategories=None): | |
"""Runs a minimal scrape and outputs the json result.""" | |
if isinstance(subcategories, list): | |
for subcategory_n in subcategories: | |
rankings = scrape_rankings(domain, year, subcategory_n) | |
if rankings: | |
fname = construct_filename_from_scrape_dict(rankings) | |
with open(''.join(['./data/',fname]),'w+') as f: | |
json.dump(rankings, f) | |
else: | |
rankings = scrape_rankings(domain, year) | |
if rankings: | |
fname = construct_filename_from_scrape_dict(rankings) | |
with open(''.join(['./data/',fname]),'w+') as f: | |
json.dump(rankings, f) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment