Script to scrape your FB groups
import json | |
from pathlib import Path | |
from datetime import datetime | |
import frontmatter | |
import re, os | |
import string | |
from urllib.parse import urlparse, parse_qs, urldefrag | |
from urllib.error import HTTPError | |
import urllib.request | |
from bs4 import BeautifulSoup | |
import requests | |
headers = { | |
'cookie': '', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:76.0) Gecko/20100101 Firefox/76.0' | |
} # paste the cookie here | |
group_name = "" # set this | |
group_id = "" # set this | |
cache_folder = Path("D:\\temp\\groups" ) / group_name # probably this as well | |
save_file = cache_folder / "out.json" | |
last_html = cache_folder / "last.html" | |
base_url = "https://m.facebook.com/groups/" + group_id | |
permalink_base = "https://m.facebook.com/groups/" + group_id + "?view=permalink&id=" | |
FOCUS_ID = "" | |
SINGLE = False | |
USE_CACHE_COMMENTS = True | |
USE_CACHE_MAIN = False | |
ME = "" | |
parser = "lxml" | |
def get_text(soup): | |
return ' '.join(soup.findAll(text=True)) | |
def get_comments_detail(url, story_id, page=0, replyto=""): | |
out_html = cache_folder / "html" / "detail" / (story_id + "-" + str(page) + replyto + ".html") | |
if out_html.exists() and USE_CACHE_COMMENTS: | |
with out_html.open(encoding="UTF-8") as f: | |
text = f.read() | |
else: | |
req = urllib.request.Request(url, headers=headers) | |
try: | |
with urllib.request.urlopen(req) as urlr: | |
text = urlr.read() | |
with out_html.open("w", encoding="UTF-8") as f: | |
f.write("<!-- " + url + " //-->") | |
f.write(text.decode('utf-8')) | |
except urllib.error.HTTPError as e: | |
return [ { | |
"error": str(e), | |
"text": "Could not retrieve comments url", | |
"url": url | |
}] | |
out_comments = [] | |
soup = BeautifulSoup(text, parser) | |
selector = "#m_story_permalink_view > div:nth-child(2) > div > div:nth-child(5) > div" | |
if len(replyto) > 0: | |
selector = "#root > div > :nth-child(3) > div" | |
comments = soup.select(selector) | |
for c in comments: | |
# print(c["id"]) | |
# only follow "previous" link | |
if c["id"].startswith("see_prev"): | |
for link in c.select("a"): | |
prev_page_url = "https://m.facebook.com" + link["href"] | |
out_comments.extend(get_comments_detail(prev_page_url, story_id, page+1)) | |
continue | |
if c["id"].startswith("comment_replies_more"): | |
if c.text.find("previous replies") >= 0: | |
for link in c.select("a"): | |
prev_page_url = "https://m.facebook.com" + link["href"] | |
out_comments.extend(get_comments_detail(prev_page_url, story_id, page+1, replyto)) | |
continue | |
if c["id"].startswith("see_next"): | |
continue | |
comment = { "id" : c["id"] } | |
for link in c.select("h3 a"): | |
comment["poster"] = link.text | |
comment["poster_url"] = link.get("href") | |
# for a in c.contents[0].contents: | |
# try: | |
# print(a.name, a.text) | |
# except: | |
# print(str(a)) | |
comment["text"] = get_text(c.contents[0].contents[1]) | |
comment["attachments"] = [] | |
# get any attached image to comment | |
for img in c.contents[0].select("a img"): | |
img_url = img["src"] | |
if not img_url.startswith("https://scontent"): | |
continue | |
u = urlparse(img_url) | |
filename = u.path.split("/")[-1] | |
download_to = cache_folder / "dls" / filename | |
if not download_to.exists(): | |
try: | |
# download the image | |
opener = urllib.request.build_opener() | |
opener.addheaders = list(map(lambda x: (x, headers[x]), headers)) | |
urllib.request.install_opener(opener) | |
urllib.request.urlretrieve(img_url, str(download_to)) | |
except urllib.error.HTTPError as e: | |
download_to = e | |
except urllib.error.URLError as e: | |
download_to = e | |
comment["attachments"].append({ | |
"url": img_url, | |
"downloaded": str(download_to) | |
}) | |
# meta row contains the date | |
if len(c.contents[0].contents) >= 4: | |
for abbr in c.contents[0].contents[3].select("abbr"): | |
comment["date"] = abbr.text | |
if len(replyto) > 0: | |
comment["replyto"] = replyto | |
# check if replies link exists | |
if len(c.contents[0].contents) >= 5: | |
for link in c.contents[0].contents[4].select("a"): | |
replies_url = "https://m.facebook.com" + link["href"] | |
comment["replies"] = (get_comments_detail(replies_url, story_id, page, c["id"])) | |
out_comments.append(comment) | |
return out_comments | |
def parse_container(child): | |
item = {"attachments":[]} | |
story_id = None | |
comments_link = None | |
# find the permalink via the comments anchor | |
for link in child.select("a"): | |
if link.text.find("Comment") >= 0: | |
comments_link = link | |
comments_link_url = "https://m.facebook.com" + link["href"] | |
u = urlparse(comments_link_url) | |
qparams = parse_qs(u.query) | |
arr = qparams.get("fbid", qparams.get("story_fbid")) | |
if arr is not None and len(arr) > 0: | |
story_id = arr[0] | |
item["metadata"] = {} | |
if child.get("data-ft") is not None: | |
item["metadata"] = json.loads(child["data-ft"]) | |
if story_id is None: | |
story_id = item["metadata"].get("mf_story_key", item["metadata"].get("top_level_post_id")) | |
if story_id is None: | |
return [] | |
item["permalink"] = permalink_base + str(story_id) | |
if len(FOCUS_ID) > 0 and story_id != FOCUS_ID: | |
return [] | |
print(story_id) | |
if story_id is None: | |
# recursing | |
items = [] | |
for article in child.select("article"): | |
items.extend(parse_container(article)) | |
return items | |
if child.text.find("for your birthday") >= 0: | |
item["type"] = "bday" | |
item["details"] = get_text(child) | |
return [item] | |
children2 = child.contents | |
header = None | |
body = None | |
footer = None | |
# print(child.prettify()) | |
print(len(children2)) | |
if len(children2) == 2: | |
footer = children2[1] | |
body = children2[0] | |
if body.text.find("posted on your") >= 0 or body.text.find("posted from") >= 0 or body.text.find(" is playing ") >= 0: | |
# recursing | |
items = [] | |
for article in footer.select("article"): | |
items.extend(parse_container(article)) | |
return items | |
elif len(children2) == 3: | |
header = children2[0] | |
body = children2[1] | |
footer = children2[2] | |
if len(body.text.strip()) == 0: | |
body = children2[0] | |
header = None | |
if body.text.find("was tagged") >= 0: | |
body = footer.contents[0] | |
footer = footer.contents[1] | |
elif len(children2) == 1: | |
print("ERROR") | |
item["type"] = "error" | |
item["desc"] = get_text(child) | |
return [item] | |
date_containers = footer.select("abbr") | |
for dc in date_containers: | |
item["date"] = dc.text | |
actual_contents = body.contents | |
if len(actual_contents) == 1: | |
if header is not None and header.text.find(" is playing "): | |
item["text"] = get_text(body) | |
for link in body.select("a"): | |
item["url"] = link["href"] | |
items = [] | |
for article in footer.select("article"): | |
items.extend(parse_container(article)) | |
item["subitems"] = items | |
return [item] | |
item["text"] = actual_contents[1].text | |
for link in actual_contents[0].select("a"): | |
item["poster_url"] = link.get("href", "?").split("?")[0] | |
item["poster"] = link.text | |
if len(link.text.strip()) > 0: | |
break | |
# get comments, but only if it's my content | |
if comments_link is not None and not comments_link.text.startswith("Comment"): | |
item["comments"] = get_comments_detail(comments_link_url, story_id) | |
if len(actual_contents) > 2: | |
print("Getting attachment") | |
# attachment exists | |
if "original_content_id" in item["metadata"] and len(actual_contents[2].select("h3 a")) > 0: | |
# attachment is a repost | |
item["repost"] = True | |
for link in actual_contents[2].select("h3 a"): | |
poster = link.text | |
raw_poster_url = link["href"].split("?")[0] | |
poster_url = "https://www.facebook.com" + raw_poster_url | |
break # only the first link is the poster | |
all_details = [] | |
for link in actual_contents[2].select("a"): | |
link_text = link.text | |
url = link.get("href") | |
if url is None: | |
continue | |
raw_url = url.split("?")[0] | |
# skip any links to the poster profile | |
if link_text == poster or raw_url == raw_poster_url: | |
continue | |
# compose the permalink | |
u = urlparse(url) | |
qparams = parse_qs(u.query) | |
story_id = qparams.get("fbid", qparams.get("story_fbid")) | |
if story_id is not None: | |
url = poster_url + "/posts/" + story_id[0] | |
src_url = link["href"] | |
if src_url.startswith("https://lm.facebook.com"): # external link | |
u = urlparse(src_url) | |
qparams = parse_qs(u.query) | |
details = { | |
"url": qparams.get("u") | |
} | |
else: | |
src_url = link["href"] | |
if not src_url.startswith("http"): | |
src_url = "https://m.facebook.com" + src_url | |
details = get_link_details(src_url) | |
details["text"] = get_text(link) | |
details["src_url"] = src_url | |
all_details.append(details) | |
item["attachments"].append({ | |
"type": "repost", | |
"poster": poster, | |
"poster_url": poster_url, | |
"url": url, | |
"desc": get_text(actual_contents[2]), | |
"details": all_details | |
}) | |
else: | |
# attachment is photo or link | |
for link in actual_contents[2].select("a"): | |
url = link["href"] | |
if url.startswith("/photo.php"): | |
# image attachment, find the image tag | |
imgs = link.select("img") | |
for img in imgs: | |
item["attachments"].append({ | |
"type": "photo", | |
"url": img["src"], | |
"desc": img["alt"] | |
}) | |
# TODO: Download the images if they're not from me | |
if url.startswith("https://lm.facebook.com"): # external link | |
u = urlparse(url) | |
qparams = parse_qs(u.query) | |
desc = "" | |
for h3 in actual_contents[2].select("h3"): | |
desc = h3.text | |
item["attachments"].append({ | |
"type": "link", | |
"url": qparams["u"][0], | |
"desc": desc | |
}) | |
if len(item["attachments"]) == 0: | |
img_url = "" | |
for img in actual_contents[2].select("img"): | |
img_url = img["src"] | |
if img_url.find("safe_image.php") >= 0: | |
u = urlparse(img_url) | |
qparams = parse_qs(u.query) | |
img_url = qparams["url"][0] | |
u = urlparse(img_url) | |
filename = u.path.split("/")[-1] | |
download_to = cache_folder / "dls" / filename | |
if not download_to.exists(): | |
try: | |
# download the image | |
opener = urllib.request.build_opener() | |
opener.addheaders = list(map(lambda x: (x, headers[x]), headers)) | |
urllib.request.install_opener(opener) | |
urllib.request.urlretrieve(img_url, str(download_to)) | |
except urllib.error.HTTPError as e: | |
download_to = e | |
except urllib.error.URLError as e: | |
download_to = e | |
item["attachments"].append({ | |
"type": "unknown", | |
"url": "", | |
"img_url": img_url, | |
"downloaded": str(download_to), | |
"desc": get_text(actual_contents[2]) | |
}) | |
return [item] | |
def get_timeline(): | |
url_next = base_url | |
items = [] | |
index = 0 | |
try: | |
while len(url_next) > 0: | |
index = index + 1 | |
out_html = cache_folder / "html" / (str(index) + ".html") | |
if out_html.exists() and USE_CACHE_MAIN: | |
with out_html.open(encoding="UTF-8") as f: | |
text = f.read() | |
else: | |
req = urllib.request.Request(url_next, headers=headers) | |
next_link = "" | |
with urllib.request.urlopen(req) as url: | |
text = url.read() | |
with last_html.open("w", encoding="UTF-8") as f: | |
f.write(text.decode('utf-8')) | |
with out_html.open("w", encoding="UTF-8") as f: | |
f.write("<!-- " + url_next + " //-->") | |
f.write(text.decode('utf-8')) | |
soup = BeautifulSoup(text, parser) | |
# print(soup) | |
# break | |
links = soup.find_all("a") | |
for link in links: | |
if link.text.find("See More Posts") >= 0: | |
next_link = "https://m.facebook.com" + link["href"] | |
containers = soup.select("#m_group_stories_container > section > article") | |
for child in containers: | |
# print(json.dumps(item, indent=2)) | |
for item in parse_container(child): | |
if item is not None: | |
item["source_url"] = url_next | |
item["source_downloaded"] = str(out_html) | |
items.append(item) | |
url_next = next_link | |
print("#### " + url_next) | |
if SINGLE: | |
break | |
# except Exception as e: | |
# print(e) | |
finally: | |
with save_file.open("w", encoding="UTF-8") as f: | |
f.write(json.dumps(items, indent=2)) | |
print("Done") | |
def get_link_details(url): | |
# print("Getting link details") | |
# print(url) | |
if url.startswith("https://m.facebook.com/video_redirect/"): | |
u = urlparse(url) | |
qparams = parse_qs(u.query) | |
return { | |
"type": "video", | |
"desc": "", | |
"url": qparams.get("src") | |
} | |
req = urllib.request.Request(url, headers=headers) | |
try: | |
with urllib.request.urlopen(req) as urlr: | |
text = urlr.read() | |
soup = BeautifulSoup(text, parser) | |
# print(soup.prettify()) | |
for i in soup.select("#MPhotoActionbar"): | |
# indicates a photo | |
mode = "PHOTO" | |
desc = "" | |
for d in soup.select("div.msg"): | |
el = d.contents[2] | |
# sometimes there's a space that gets returned as d.contents[2], | |
# when that happens skip to next sibling | |
while el.name is None: | |
el = el.next_sibling | |
desc = get_text(el) | |
imgs = soup.select("img.img") | |
out_url = imgs[1]["src"] | |
u = urlparse(out_url) | |
filename = u.path.split("/")[-1] | |
download_to = cache_folder / "dls" / filename | |
if not download_to.exists(): | |
try: | |
# download the image | |
opener = urllib.request.build_opener() | |
opener.addheaders = list(map(lambda x: (x, headers[x]), headers)) | |
urllib.request.install_opener(opener) | |
urllib.request.urlretrieve(out_url, str(download_to)) | |
except urllib.error.HTTPError as e: | |
download_to = e | |
return { | |
"type": "photo", | |
"desc": desc, | |
"url": out_url, | |
"orig_url": url, | |
"downloaded": str(download_to) | |
} | |
for div in soup.select("#m_story_permalink_view > div > div > div > div:nth-child(2)"): | |
mode = "STORY" | |
return { | |
"desc": div.prettify(), | |
"type": "story" | |
} | |
# print(soup.prettify()) | |
if soup.text.find("You can try again later.") >= 0: | |
return { | |
"type": "rate-limited", | |
"url": url | |
} | |
return { | |
"type": "ERROR", | |
"url": url | |
} | |
except urllib.error.HTTPError as e: | |
return { | |
"type": "ERROR", | |
"desc": str(e), | |
"url": url | |
} | |
def verify(): | |
with save_file.open(encoding="UTF-8") as f: | |
data = json.loads(f.read()) | |
print(len(data)) | |
posters = {} | |
repost_count = 0 | |
for item in data: | |
if item.get("type") == "repost": | |
repost_count = repost_count + 1 | |
poster = item.get("poster", "") | |
if poster not in posters: | |
posters[poster] = 1 | |
else: | |
posters[poster] = posters[poster] + 1 | |
print(json.dumps(posters, indent=2)) | |
print(repost_count) | |
# verify() | |
get_timeline() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment