Skip to content

Instantly share code, notes, and snippets.

@NickCrews
Created August 3, 2023 19:20
Show Gist options
  • Save NickCrews/207888d33020b8fada4908f8be8dd930 to your computer and use it in GitHub Desktop.
Save NickCrews/207888d33020b8fada4908f8be8dd930 to your computer and use it in GitHub Desktop.
"""Get the Facebook page IDs for a set of facebook URLs.
Uses Playwright to emulate me going to the page and looking at the page ID in the
actual HTML. I haven't found a more programmatic way to do this without
more complicated developer signup and API keys.
Uses cookies for authentication, as described in
https://github.com/kevinzg/facebook-scraper/blob/392be1eabb43ed301fb7d5c3fd6e10318d26ac27/README.md
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
from typing import Iterable, NamedTuple, Protocol
from urllib.parse import parse_qs, urlencode, urlparse
from playwright.async_api import BrowserContext, Page, TimeoutError, async_playwright
logger = logging.getLogger(__name__)
def get_page_ids(urls: Iterable[str], options: ScrapeOptions) -> list[PageInfo]:
"""Get the page ids for a set of facebook URLs."""
return asyncio.run(_get_page_ids_async(urls, options))
class FBPage(Protocol):
@property
def page_id(self) -> int | None:
...
def __repr__(self) -> str:
return f"{self.__class__.__name__}(page_id={self.page_id})"
class DeletedFBPage(FBPage):
page_id = None
class PersonalFBPage(FBPage):
page_id = None
class GroupFBPage(FBPage):
page_id = None
class WithIdFBPage(FBPage):
def __init__(self, page_id: int):
self._page_id = page_id
@property
def page_id(self) -> int:
return self._page_id
class PageInfo(NamedTuple):
url: str
page: FBPage | None
error: Exception | None
class ScrapeOptions(NamedTuple):
limit: int
headless: bool
cookies: list[dict[str, str]] | str | None
"""See readme for how to get cookies."""
async def _get_page_ids_async(
urls: Iterable[str],
options: ScrapeOptions,
):
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=options.headless,
slow_mo=100,
)
context = await browser.new_context()
await context.add_cookies(_get_cookies(options.cookies))
async def f(url):
try:
page = await _get_fb_page(url, context)
return PageInfo(url, page, None)
except Exception as e:
return PageInfo(url, None, e)
tasks = (f(url) for url in urls)
results = []
async for task in _limit_concurrency(tasks, options.limit):
results.append(await task)
return results
def _get_transparency_urls(page_url_str: str) -> Iterable[str]:
base_url = urlparse(page_url_str)
path = base_url.path.removesuffix("/") + "/about_profile_transparency"
yield base_url._replace(path=path).geturl()
queries: dict = parse_qs(base_url.query)
queries["sk"] = "about_profile_transparency"
new_qs = urlencode(queries, doseq=True)
yield base_url._replace(query=new_qs).geturl()
async def _find_page_id_from_url(page: Page, page_url_str: str) -> int:
"""Get page_id from /about_profile_transparency or &sk=about_profile_transparency
Only some pages have this."""
failed_urls = []
for url in _get_transparency_urls(page_url_str):
try:
return await _find_page_id_from_transparency_from_url(page, url)
except ValueError:
failed_urls.append(url)
raise ValueError(f"Could not find page id for {page_url_str} at {failed_urls}")
async def _find_page_id_from_transparency_from_url(page: Page, url: str) -> int:
await page.goto(url)
# We don't need to close the opening "sign in" dialog
# await page.get_by_role("button", name="Close").click()
locator = (
page.locator("div").filter(has_text=re.compile(r"^.*\d+.*Page ID$")).nth(0)
)
try:
await locator.wait_for(timeout=10 * 1000)
except TimeoutError:
raise ValueError(f"Could not find page id for {url}")
pattern = re.compile(r"^\D*(\d+)\D*$")
match = pattern.match(await locator.inner_text())
if not match:
raise ValueError("No match found")
return int(match.group(1))
async def _locator_exists(page: Page, locator) -> bool:
try:
await locator.wait_for(timeout=10 * 1000)
return True
except TimeoutError:
return False
async def _is_personal_page(page: Page, url: str) -> bool:
"""Check if the page is a personal page, which we can't get page ids for."""
# TODO: this is currently a bit broken. It returns True for pages like
# https://www.facebook.com/andyak, which looks like a personal page and has friends,
# but is a public figure type and so has pageID if you go to
# https://www.facebook.com/andyak/about_profile_transparency
await page.goto(url)
friends_tab = page.get_by_role("tab", name="Friends")
add_friend_button = page.get_by_role("button", name="Add Friend")
either = friends_tab.or_(add_friend_button).nth(0)
return await _locator_exists(page, either)
async def _is_deleted_page(page: Page, url: str) -> bool:
await page.goto(url)
locator = page.get_by_text("This content isn't available right now").nth(0)
return await _locator_exists(page, locator)
async def _do_get_fb_page(page: Page, url: str) -> FBPage:
if "facebook.com/groups/" in url:
# We can't get page ids for groups
return GroupFBPage()
# TODO: some urls are actually for private groups and so we can't find a page id
# others are for pages that don't have the transparency page for another reason
# that I need to figure out
try:
page_id = await _find_page_id_from_url(page, url)
return WithIdFBPage(page_id)
except ValueError as e:
if await _is_deleted_page(page, url):
return DeletedFBPage()
if await _is_personal_page(page, url):
return PersonalFBPage()
raise e
async def _get_fb_page(url: str, browser: BrowserContext) -> FBPage:
page = await browser.new_page()
try:
result = await _do_get_fb_page(page, url)
logger.info(f"For {url}, found page {result}")
return result
except Exception as e:
logger.error(f"{e}")
raise e
finally:
await page.close()
def _get_cookies(cookies) -> list[dict[str, str]]:
if cookies is None:
return []
def fix_cookies(cookies):
return [fix_cookie(cookie) for cookie in cookies]
def fix_cookie(cookie):
keys = {"name", "value", "domain", "path"}
return {k: v for k, v in cookie.items() if k in keys}
if isinstance(cookies, list):
return fix_cookies(cookies)
with open(cookies) as f:
return fix_cookies(json.load(f))
# From https://death.andgravity.com/limit-concurrency#aside-backpressure
# Only run a certain number of coroutines at a time
async def _limit_concurrency(aws, limit: int):
try:
aws = aiter(aws)
is_async = True
except TypeError:
aws = iter(aws)
is_async = False
aws_ended = False
pending = set()
while pending or not aws_ended:
while len(pending) < limit and not aws_ended:
try:
aw = await anext(aws) if is_async else next(aws)
except StopAsyncIteration if is_async else StopIteration:
aws_ended = True
else:
pending.add(asyncio.ensure_future(aw))
if not pending:
return
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
while done:
yield done.pop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment