Skip to content

Instantly share code, notes, and snippets.

@sumitpatel93
Created February 22, 2024 09:25
Show Gist options
  • Save sumitpatel93/364a3f512d77e14ee51fddd4fd0d2e68 to your computer and use it in GitHub Desktop.
Save sumitpatel93/364a3f512d77e14ee51fddd4fd0d2e68 to your computer and use it in GitHub Desktop.
import argparse
import time
import json
import csv
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup as bs
with open('facebook_credentials.txt') as file:
EMAIL = file.readline().split('"')[1]
PASSWORD = file.readline().split('"')[1]
def _extract_post_text(item):
actualPosts = item.find_all(attrs={"data-testid": "post_message"})
text = ""
if actualPosts:
for posts in actualPosts:
paragraphs = posts.find_all('p')
text = ""
for index in range(0, len(paragraphs)):
text += paragraphs[index].text
return text
def _extract_link(item):
postLinks = item.find_all(class_="_6ks")
link = ""
for postLink in postLinks:
link = postLink.find('a').get('href')
return link
def _extract_post_id(item):
postIds = item.find_all(class_="_5pcq")
post_id = ""
for postId in postIds:
post_id = f"https://www.facebook.com{postId.get('href')}"
return post_id
def _extract_image(item):
postPictures = item.find_all(class_="scaledImageFitWidth img")
image = ""
for postPicture in postPictures:
image = postPicture.get('src')
return image
def _extract_shares(item):
postShares = item.find_all(class_="_4vn1")
shares = ""
for postShare in postShares:
x = postShare.string
if x is not None:
x = x.split(">", 1)
shares = x
else:
shares = "0"
return shares
def _extract_comments(item):
postComments = item.findAll("div", {"class": "_4eek"})
comments = dict()
# print(postDict)
for comment in postComments:
if comment.find(class_="_6qw4") is None:
continue
commenter = comment.find(class_="_6qw4").text
comments[commenter] = dict()
comment_text = comment.find("span", class_="_3l3x")
if comment_text is not None:
comments[commenter]["text"] = comment_text.text
comment_link = comment.find(class_="_ns_")
if comment_link is not None:
comments[commenter]["link"] = comment_link.get("href")
comment_pic = comment.find(class_="_2txe")
if comment_pic is not None:
comments[commenter]["image"] = comment_pic.find(class_="img").get("src")
commentList = item.find('ul', {'class': '_7791'})
if commentList:
comments = dict()
comment = commentList.find_all('li')
if comment:
for litag in comment:
aria = litag.find("div", {"class": "_4eek"})
if aria:
commenter = aria.find(class_="_6qw4").text
comments[commenter] = dict()
comment_text = litag.find("span", class_="_3l3x")
if comment_text:
comments[commenter]["text"] = comment_text.text
# print(str(litag)+"\n")
comment_link = litag.find(class_="_ns_")
if comment_link is not None:
comments[commenter]["link"] = comment_link.get("href")
comment_pic = litag.find(class_="_2txe")
if comment_pic is not None:
comments[commenter]["image"] = comment_pic.find(class_="img").get("src")
repliesList = litag.find(class_="_2h2j")
if repliesList:
reply = repliesList.find_all('li')
if reply:
comments[commenter]['reply'] = dict()
for litag2 in reply:
aria2 = litag2.find("div", {"class": "_4efk"})
if aria2:
replier = aria2.find(class_="_6qw4").text
if replier:
comments[commenter]['reply'][replier] = dict()
reply_text = litag2.find("span", class_="_3l3x")
if reply_text:
comments[commenter]['reply'][replier][
"reply_text"] = reply_text.text
r_link = litag2.find(class_="_ns_")
if r_link is not None:
comments[commenter]['reply']["link"] = r_link.get("href")
r_pic = litag2.find(class_="_2txe")
if r_pic is not None:
comments[commenter]['reply']["image"] = r_pic.find(
class_="img").get("src")
return comments
def _extract_reaction(item):
toolBar = item.find_all(attrs={"role": "toolbar"})
if not toolBar: # pretty fun
return
reaction = dict()
for toolBar_child in toolBar[0].children:
str = toolBar_child['data-testid']
reaction = str.split("UFI2TopReactions/tooltip_")[1]
reaction[reaction] = 0
for toolBar_child_child in toolBar_child.children:
num = toolBar_child_child['aria-label'].split()[0]
# fix weird ',' happening in some reaction values
num = num.replace(',', '.')
if 'K' in num:
realNum = float(num[:-1]) * 1000
else:
realNum = float(num)
reaction[reaction] = realNum
return reaction
def _extract_html(bs_data):
#Add to check
with open('./bs.html',"w", encoding="utf-8") as file:
file.write(str(bs_data.prettify()))
k = bs_data.find_all(class_="_5pcr userContentWrapper")
postBigDict = list()
for item in k:
postDict = dict()
postDict['Post'] = _extract_post_text(item)
postDict['Link'] = _extract_link(item)
postDict['PostId'] = _extract_post_id(item)
postDict['Image'] = _extract_image(item)
postDict['Shares'] = _extract_shares(item)
postDict['Comments'] = _extract_comments(item)
# postDict['Reaction'] = _extract_reaction(item)
#Add to check
postBigDict.append(postDict)
with open('./postBigDict.json','w', encoding='utf-8') as file:
file.write(json.dumps(postBigDict, ensure_ascii=False).encode('utf-8').decode())
return postBigDict
def _login(browser, email, password):
browser.get("http://facebook.com")
browser.maximize_window()
browser.find_element("name","email").send_keys(email)
browser.find_element("name","pass").send_keys(password)
button = browser.find_element("name",'login')
button.send_keys()
button.submit()
time.sleep(5)
def _count_needed_scrolls(browser, infinite_scroll, numOfPost):
if infinite_scroll:
lenOfPage = browser.execute_script(
"window.scrollTo(0, document.body.scrollHeight);var lenOfPage=document.body.scrollHeight;return lenOfPage;"
)
else:
# roughly 8 post per scroll kindaOf
lenOfPage = int(numOfPost / 8)
print("Number Of Scrolls Needed " + str(lenOfPage))
return lenOfPage
def _scroll(browser, infinite_scroll, lenOfPage):
lastCount = -1
match = False
while not match:
if infinite_scroll:
lastCount = lenOfPage
else:
lastCount += 1
# wait for the browser to load, this time can be changed slightly ~3 seconds with no difference, but 5 seems
# to be stable enough
time.sleep(5)
if infinite_scroll:
lenOfPage = browser.execute_script(
"window.scrollTo(0, document.body.scrollHeight);var lenOfPage=document.body.scrollHeight;return "
"lenOfPage;")
else:
browser.execute_script(
"window.scrollTo(0, document.body.scrollHeight);var lenOfPage=document.body.scrollHeight;return "
"lenOfPage;")
if lastCount == lenOfPage:
match = True
def extract(page, numOfPost, infinite_scroll=False, scrape_comment=False):
option = Options()
option.add_argument("--disable-infobars")
option.add_argument("start-maximized")
option.add_argument("--disable-extensions")
# Pass the argument 1 to allow and 2 to block
option.add_experimental_option("prefs", {
"profile.default_content_setting_values.notifications": 1
})
# chromedriver should be in the same folder as file
browser = webdriver.Chrome(options=option)
_login(browser, EMAIL, PASSWORD)
browser.get(page)
lenOfPage = _count_needed_scrolls(browser, infinite_scroll, numOfPost)
_scroll(browser, infinite_scroll, lenOfPage)
# click on all the comments to scrape them all!
# TODO: need to add more support for additional second level comments
# TODO: ie. comment of a comment
if scrape_comment:
#first uncollapse collapsed comments
unCollapseCommentsButtonsXPath = '//a[contains(@class,"_666h")]'
unCollapseCommentsButtons = browser.find_elements("xpath",unCollapseCommentsButtonsXPath)
for unCollapseComment in unCollapseCommentsButtons:
action = webdriver.common.action_chains.ActionChains(browser)
try:
# move to where the un collapse on is
action.move_to_element_with_offset(unCollapseComment, 5, 5)
action.perform()
unCollapseComment.click()
except:
# do nothing right here
pass
#second set comment ranking to show all comments
rankDropdowns = browser.find_elements("class",'_2pln') #select boxes who have rank dropdowns
rankXPath = '//div[contains(concat(" ", @class, " "), "uiContextualLayerPositioner") and not(contains(concat(" ", @class, " "), "hidden_elem"))]//div/ul/li/a[@class="_54nc"]/span/span/div[@data-ordering="RANKED_UNFILTERED"]'
for rankDropdown in rankDropdowns:
#click to open the filter modal
action = webdriver.common.action_chains.ActionChains(browser)
try:
action.move_to_element_with_offset(rankDropdown, 5, 5)
action.perform()
rankDropdown.click()
except:
pass
# if modal is opened filter comments
ranked_unfiltered = browser.find_elements_by_xpath(rankXPath) # RANKED_UNFILTERED => (All Comments)
if len(ranked_unfiltered) > 0:
try:
ranked_unfiltered[0].click()
except:
pass
moreComments = browser.find_elements_by_xpath('//a[@class="_4sxc _42ft"]')
print("Scrolling through to click on more comments")
while len(moreComments) != 0:
for moreComment in moreComments:
action = webdriver.common.action_chains.ActionChains(browser)
try:
# move to where the comment button is
action.move_to_element_with_offset(moreComment, 5, 5)
action.perform()
moreComment.click()
except:
# do nothing right here
pass
moreComments = browser.find_elements_by_xpath('//a[@class="_4sxc _42ft"]')
# Now that the page is fully scrolled, grab the source code.
source_data = browser.page_source
# Throw your source into BeautifulSoup and start parsing!
bs_data = bs(source_data, 'html.parser')
postBigDict = _extract_html(bs_data)
browser.close()
return postBigDict
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Facebook Page Scraper")
required_parser = parser.add_argument_group("required arguments")
required_parser.add_argument('-page', '-p', help="The Facebook Public Page you want to scrape", required=True)
required_parser.add_argument('-len', '-l', help="Number of Posts you want to scrape", type=int, required=True)
optional_parser = parser.add_argument_group("optional arguments")
optional_parser.add_argument('-infinite', '-i',
help="Scroll until the end of the page (1 = infinite) (Default is 0)", type=int,
default=0)
optional_parser.add_argument('-usage', '-u', help="What to do with the data: "
"Print on Screen (PS), "
"Write to Text File (WT) (Default is WT)", default="CSV")
optional_parser.add_argument('-comments', '-c', help="Scrape ALL Comments of Posts (y/n) (Default is n). When "
"enabled for pages where there are a lot of comments it can "
"take a while", default="No")
args = parser.parse_args()
infinite = False
if args.infinite == 1:
infinite = True
scrape_comment = False
if args.comments == 'y':
scrape_comment = True
postBigDict = extract(page=args.page, numOfPost=args.len, infinite_scroll=infinite, scrape_comment=scrape_comment)
#TODO: rewrite parser
if args.usage == "WT":
with open('output.txt', 'w') as file:
for post in postBigDict:
file.write(json.dumps(post)) # use json load to recover
elif args.usage == "CSV":
with open('data.csv', 'w',) as csvfile:
writer = csv.writer(csvfile)
#writer.writerow(['Post', 'Link', 'Image', 'Comments', 'Reaction'])
writer.writerow(['Post', 'Link', 'Image', 'Comments', 'Shares'])
for post in postBigDict:
writer.writerow([post['Post'], post['Link'],post['Image'], post['Comments'], post['Shares']])
#writer.writerow([post['Post'], post['Link'],post['Image'], post['Comments'], post['Reaction']])
else:
for post in postBigDict:
print(post)
print("Finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment