Skip to content

Instantly share code, notes, and snippets.

@hassanabidpk
Last active August 26, 2018 05:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hassanabidpk/1a17eba279ef29d9727c43595386ad67 to your computer and use it in GitHub Desktop.
Save hassanabidpk/1a17eba279ef29d9727c43595386ad67 to your computer and use it in GitHub Desktop.
from bs4 import BeautifulSoup
import requests
import re
import csv
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
"""
example_url : https://www.amazon.com/s/ref=nb_sb_noss?url=search-alias%3Dbeauty&field-keywords=lipstick&page=1
Returns total number of recommended_items for a single searched item
<Every page has 48 search results>
Installation
1- Download chromedriver_mac64.zip from http://chromedriver.storage.googleapis.com/index.html?path=2.24/
2- Unzip and place in the same folder with code and note down the path.
3- Repace CHROME_WEBDRIVER_PATH with Path on your computer for chromedriver
How to Run (python 3)
$ python -m venv amazonvenv
$ source amazonvenv\Scripts\activate
$ pip install requests
$ pip install beautifulsoup4
$ pip install -U selenium
$ pip install html5lib
$ python crawling.py
"""
BASE_URL = "https://www.amazon.com/s/ref=nb_sb_noss_2"
KEYWORD = "lipstick"
headers = {'User-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36'}
CHROME_WEBDRIVER_PATH = '/Users/hassanabid/Documents/hassan/GDE_code/web_crawling_amazon/chromedriver'
START_PAGE = 1
END_PAGE = 2
def main():
payload_main = {'url': "search-alias%3Dbeauty",'field-keywords': KEYWORD,"page":1}
r_amazon = requests.get(BASE_URL, params=payload_main,headers=headers)
soup = BeautifulSoup(r_amazon.text,'html5lib')
count_raw = soup.select("#s-result-count")
if count_raw:
result = re.search(r'[\d]+.[\d]+.*[\d]+.[\d]+', count_raw[0].text)
if result:
total_pages_re = re.search(r'[\d]+,[\d]+',result.group())
total_pages_count = int(total_pages_re.group().replace(",",""))/48
print("Total_Page_Count : {}".format(int(total_pages_count)))
for i in range(START_PAGE,END_PAGE+1):
item_links_raw = get_next_page(i)
items_soup,items_links = get_items_soup(total_pages_count,item_links_raw)
find_recommended_items(items_soup,items_links,i)
else :
print("nothing found for {}".format(r_amazon.url))
def get_next_page(page_no):
payload_main = {'url': "search-alias%3Dbeauty",'field-keywords': KEYWORD,"page":page_no}
r_amazon = requests.get(BASE_URL, params=payload_main,headers=headers)
soup = BeautifulSoup(r_amazon.text,'html5lib')
count_raw = soup.select("#s-result-count")
item_links_raw = soup.select("#resultsCol .a-row .a-spacing-top-mini > a")
print("Page No : {} - len(item_links) : {}".format(page_no,len(item_links_raw)))
return item_links_raw
def get_items_soup(total_pages_count,item_links_raw):
items_soup = []
valid_item_links = []
for item in item_links_raw:
single_item_link = item["href"]
try:
r_single_item = requests.get(single_item_link,headers=headers)
soup = BeautifulSoup(r_single_item.text,"html5lib")
items_soup.append(soup)
print("fetched item no. {}".format(len(items_soup)))
valid_item_links.append(single_item_link)
except:
print("couldn't find!")
return items_soup,valid_item_links
def find_recommended_items(items_soup,item_links,page_no):
key_items = 0
titles = []
brands = []
prices = []
asins = []
recommended_items = {}
for index, soup in enumerate(items_soup):
main_title, brand, price, main_asin = find_title_brand_price(soup)
if not main_title:
# move this below and add a case when title is not found. (empty [] in dict)
print("title not found for item :{} - link: {}".format(key_items,item_links[index]))
continue
else :
key_items = index + 1
titles.append(main_title)
brands.append(brand)
prices.append(price)
asins.append(main_asin)
recommended_items_raw = soup.select("#purchase-sims-feature .a-carousel-viewport li a")
print("fetching recommended_items (count: {}) for {}".format(len(recommended_items_raw),key_items))
rec_items_href = []
rec_items_titles = []
rec_items_brands = []
rec_items_prices = []
rec_items_asins = []
recommended_items_raw_extended = initiate_webdriver(item_links[index],recommended_items_raw)
recommended_items_raw_extended = recommended_items_raw_extended
for item in recommended_items_raw_extended:
try :
# print("item : {}".format(item["href"]))
if not "product-reviews" in item["href"]:
rec_items_href.append(item["href"])
except KeyError:
print("no link found")
print("found {} valid links in rec_items for {}".format(len(rec_items_href),key_items))
rec_items_href = rec_items_href[0:31]
print("fetch title, brand, price and asin for rec items")
for single_href in rec_items_href:
try:
r_single_rec_item = requests.get("https://www.amazon.com{}".format(single_href),headers=headers)
soup = BeautifulSoup(r_single_rec_item.text,"html5lib")
title,brand,price,asin = find_title_brand_price(soup)
if title and brand:
if not title in rec_items_titles:
# print("title : {}".format(len(title)))
rec_items_titles.append(title)
rec_items_brands.append(brand)
rec_items_prices.append(price)
rec_items_asins.append(asin)
except:
print("couldn't fetch single_href :{}".format(single_href))
items_dict = {"titles" : rec_items_titles,"brands":rec_items_brands,"prices":rec_items_prices,"asins":rec_items_asins}
recommended_items[main_asin] = items_dict
print("recorded recommended items for key:{} - values:{}".format(main_asin,len(recommended_items)))
# write to csv
writecsv(titles,brands,prices,asins,recommended_items,page_no)
def find_title_brand_price(soup):
title = None
brand = None
price = None
asin = None
title_raw = soup.select("#productTitle")
brand_raw = soup.select("#brand")
price_raw = soup.select("#priceblock_ourprice")
asin_raw = soup.select("#detail-bullets")
max_recs = soup.select("#purchase-sims-feature span.a-carousel-page-max")
if not (title_raw or brand_raw) :
return title,brand,price,asin
else:
try :
title = title_raw[0].string.strip()
brand = brand_raw[0].string.strip()
if not asin_raw:
asin_raw = soup.select("#productDetails_detailBullets_sections1 .a-size-base")
asin = asin_raw[1].string.strip()
else:
asin = get_asin(asin_raw)
if not price_raw:
price_raw = soup.select("#priceblock_saleprice")
price = price_raw[0].string.strip()
return title,brand,price,asin
except:
return title,brand,price,asin
def get_asin(soup_raw):
asin_find = re.search(r'.*ASIN:.*[\w]{10}', soup_raw[0].text)
if asin_find:
#ASIN: B01HRNEHRE
return asin_find.group(0).split(" ")[1]
else :
print("asin not found :(")
return None
def writecsv(titles,brands,prices,asins,rec_items,page_no):
print("start writing {} rec_items to csv".format(len(rec_items)))
rec_items_name = ["rec_item{}_name".format(i) for i in range(1,81) ]
rec_items_brands = ["rec_item{}_brand".format(i) for i in range(1,81) ]
rec_items_price = ["rec_item{}_price".format(i) for i in range(1,81) ]
rec_items_asin = ["rec_item{}_asin".format(i) for i in range(1,81) ]
rec_items_fieldnames = create_rec_items_header(rec_items_name,rec_items_brands,rec_items_price,rec_items_asin)
fieldnames = ['item_num','item_name', 'item_brand','item_price','item_asin']
fieldnames.extend(rec_items_fieldnames)
for key in rec_items:
print("key : {}".format(key))
with open('amazon_'+str(page_no)+'.csv', 'a', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for index_main, title in enumerate(titles):
rec_items_one = rec_items.get(asins[index_main])
rec_items_titles_list = rec_items_one.get("titles")
rec_items_brands_list = rec_items_one.get("brands")
rec_items_prices_list = rec_items_one.get("prices")
rec_items_asins_list = rec_items_one.get("asins")
row = {'item_num': str(index_main+1), 'item_name': title,'item_brand': brands[index_main],'item_price': prices[index_main],'item_asin': asins[index_main]}
for index, rec_title in enumerate(rec_items_titles_list):
row["rec_item{}_name".format(index+1)] = rec_title
row["rec_item{}_brand".format(index+1)] = rec_items_brands_list[index]
row["rec_item{}_price".format(index+1)] = rec_items_prices_list[index]
row["rec_item{}_asin".format(index+1)] = rec_items_asins_list[index]
print("writing row : {}".format(index_main+1))
writer.writerow(row)
def create_rec_items_header(names,brands,prices,asins):
result = []
for i in range(0,30):
result.append(names[i])
result.append(brands[i])
result.append(prices[i])
result.append(asins[i])
return result
def initiate_webdriver(href,item_raw_links):
print("initiate_webdriver for : {}".format(href))
result = []
result.extend(item_raw_links)
driver = webdriver.Chrome(CHROME_WEBDRIVER_PATH)
driver.implicitly_wait(10)
driver.get(href)
print("wait...... fetching data")
driver.execute_script("window.scrollTo(0, 1200);")
time.sleep(2)
next_button = None
element = None
second_case = False
try:
next_button = driver.find_element_by_css_selector("#purchase-sims-feature a.a-carousel-goto-nextpage")
except:
print("get next_button after scrolling to 3000px more!!")
second_case = True
driver.execute_script("window.scrollTo(0, 3000);")
time.sleep(2)
next_button = driver.find_element_by_css_selector("#day0-sims-feature a.a-carousel-goto-nextpage")
print("next_button_raw_soup : {}".format(next_button))
for i in range(6,80,5):
if len(result) > 95:
print("fetched 40 rec items")
break
try :
print("click executed for {}".format(i))
next_button.click()
time.sleep(3)
html = driver.page_source
soup = BeautifulSoup(html,"html5lib")
if second_case:
recommended_items_raw = soup.select("#day0-sims-feature .a-carousel-viewport li a")
else :
recommended_items_raw = soup.select("#purchase-sims-feature .a-carousel-viewport li a")
print("fetched recommended_items using webdriver: {}".format(len(recommended_items_raw)))
result.extend(recommended_items_raw)
time.sleep(2)
except:
print("next_button is click failed - so try again")
time.sleep(3)
try:
next_button = driver.find_element_by_css_selector("#purchase-sims-feature a.a-carousel-goto-nextpage")
except:
second_case = True
driver.execute_script("window.scrollTo(0, 3000);")
time.sleep(2)
next_button = driver.find_element_by_css_selector("#day0-sims-feature a.a-carousel-goto-nextpage")
driver.quit()
print("initiate_webdriver results : {}".format(len(result)))
return result
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment