Created
August 3, 2023 19:20
-
-
Save NickCrews/207888d33020b8fada4908f8be8dd930 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Get the Facebook page IDs for a set of facebook URLs. | |
Uses Playwright to emulate me going to the page and looking at the page ID in the | |
actual HTML. I haven't found a more programmatic way to do this without | |
more complicated developer signup and API keys. | |
Uses cookies for authentication, as described in | |
https://github.com/kevinzg/facebook-scraper/blob/392be1eabb43ed301fb7d5c3fd6e10318d26ac27/README.md | |
""" | |
from __future__ import annotations | |
import asyncio | |
import json | |
import logging | |
import re | |
from typing import Iterable, NamedTuple, Protocol | |
from urllib.parse import parse_qs, urlencode, urlparse | |
from playwright.async_api import BrowserContext, Page, TimeoutError, async_playwright | |
logger = logging.getLogger(__name__) | |
def get_page_ids(urls: Iterable[str], options: ScrapeOptions) -> list[PageInfo]: | |
"""Get the page ids for a set of facebook URLs.""" | |
return asyncio.run(_get_page_ids_async(urls, options)) | |
class FBPage(Protocol): | |
@property | |
def page_id(self) -> int | None: | |
... | |
def __repr__(self) -> str: | |
return f"{self.__class__.__name__}(page_id={self.page_id})" | |
class DeletedFBPage(FBPage): | |
page_id = None | |
class PersonalFBPage(FBPage): | |
page_id = None | |
class GroupFBPage(FBPage): | |
page_id = None | |
class WithIdFBPage(FBPage): | |
def __init__(self, page_id: int): | |
self._page_id = page_id | |
@property | |
def page_id(self) -> int: | |
return self._page_id | |
class PageInfo(NamedTuple): | |
url: str | |
page: FBPage | None | |
error: Exception | None | |
class ScrapeOptions(NamedTuple): | |
limit: int | |
headless: bool | |
cookies: list[dict[str, str]] | str | None | |
"""See readme for how to get cookies.""" | |
async def _get_page_ids_async( | |
urls: Iterable[str], | |
options: ScrapeOptions, | |
): | |
async with async_playwright() as p: | |
browser = await p.chromium.launch( | |
headless=options.headless, | |
slow_mo=100, | |
) | |
context = await browser.new_context() | |
await context.add_cookies(_get_cookies(options.cookies)) | |
async def f(url): | |
try: | |
page = await _get_fb_page(url, context) | |
return PageInfo(url, page, None) | |
except Exception as e: | |
return PageInfo(url, None, e) | |
tasks = (f(url) for url in urls) | |
results = [] | |
async for task in _limit_concurrency(tasks, options.limit): | |
results.append(await task) | |
return results | |
def _get_transparency_urls(page_url_str: str) -> Iterable[str]: | |
base_url = urlparse(page_url_str) | |
path = base_url.path.removesuffix("/") + "/about_profile_transparency" | |
yield base_url._replace(path=path).geturl() | |
queries: dict = parse_qs(base_url.query) | |
queries["sk"] = "about_profile_transparency" | |
new_qs = urlencode(queries, doseq=True) | |
yield base_url._replace(query=new_qs).geturl() | |
async def _find_page_id_from_url(page: Page, page_url_str: str) -> int: | |
"""Get page_id from /about_profile_transparency or &sk=about_profile_transparency | |
Only some pages have this.""" | |
failed_urls = [] | |
for url in _get_transparency_urls(page_url_str): | |
try: | |
return await _find_page_id_from_transparency_from_url(page, url) | |
except ValueError: | |
failed_urls.append(url) | |
raise ValueError(f"Could not find page id for {page_url_str} at {failed_urls}") | |
async def _find_page_id_from_transparency_from_url(page: Page, url: str) -> int: | |
await page.goto(url) | |
# We don't need to close the opening "sign in" dialog | |
# await page.get_by_role("button", name="Close").click() | |
locator = ( | |
page.locator("div").filter(has_text=re.compile(r"^.*\d+.*Page ID$")).nth(0) | |
) | |
try: | |
await locator.wait_for(timeout=10 * 1000) | |
except TimeoutError: | |
raise ValueError(f"Could not find page id for {url}") | |
pattern = re.compile(r"^\D*(\d+)\D*$") | |
match = pattern.match(await locator.inner_text()) | |
if not match: | |
raise ValueError("No match found") | |
return int(match.group(1)) | |
async def _locator_exists(page: Page, locator) -> bool: | |
try: | |
await locator.wait_for(timeout=10 * 1000) | |
return True | |
except TimeoutError: | |
return False | |
async def _is_personal_page(page: Page, url: str) -> bool: | |
"""Check if the page is a personal page, which we can't get page ids for.""" | |
# TODO: this is currently a bit broken. It returns True for pages like | |
# https://www.facebook.com/andyak, which looks like a personal page and has friends, | |
# but is a public figure type and so has pageID if you go to | |
# https://www.facebook.com/andyak/about_profile_transparency | |
await page.goto(url) | |
friends_tab = page.get_by_role("tab", name="Friends") | |
add_friend_button = page.get_by_role("button", name="Add Friend") | |
either = friends_tab.or_(add_friend_button).nth(0) | |
return await _locator_exists(page, either) | |
async def _is_deleted_page(page: Page, url: str) -> bool: | |
await page.goto(url) | |
locator = page.get_by_text("This content isn't available right now").nth(0) | |
return await _locator_exists(page, locator) | |
async def _do_get_fb_page(page: Page, url: str) -> FBPage: | |
if "facebook.com/groups/" in url: | |
# We can't get page ids for groups | |
return GroupFBPage() | |
# TODO: some urls are actually for private groups and so we can't find a page id | |
# others are for pages that don't have the transparency page for another reason | |
# that I need to figure out | |
try: | |
page_id = await _find_page_id_from_url(page, url) | |
return WithIdFBPage(page_id) | |
except ValueError as e: | |
if await _is_deleted_page(page, url): | |
return DeletedFBPage() | |
if await _is_personal_page(page, url): | |
return PersonalFBPage() | |
raise e | |
async def _get_fb_page(url: str, browser: BrowserContext) -> FBPage: | |
page = await browser.new_page() | |
try: | |
result = await _do_get_fb_page(page, url) | |
logger.info(f"For {url}, found page {result}") | |
return result | |
except Exception as e: | |
logger.error(f"{e}") | |
raise e | |
finally: | |
await page.close() | |
def _get_cookies(cookies) -> list[dict[str, str]]: | |
if cookies is None: | |
return [] | |
def fix_cookies(cookies): | |
return [fix_cookie(cookie) for cookie in cookies] | |
def fix_cookie(cookie): | |
keys = {"name", "value", "domain", "path"} | |
return {k: v for k, v in cookie.items() if k in keys} | |
if isinstance(cookies, list): | |
return fix_cookies(cookies) | |
with open(cookies) as f: | |
return fix_cookies(json.load(f)) | |
# From https://death.andgravity.com/limit-concurrency#aside-backpressure | |
# Only run a certain number of coroutines at a time | |
async def _limit_concurrency(aws, limit: int): | |
try: | |
aws = aiter(aws) | |
is_async = True | |
except TypeError: | |
aws = iter(aws) | |
is_async = False | |
aws_ended = False | |
pending = set() | |
while pending or not aws_ended: | |
while len(pending) < limit and not aws_ended: | |
try: | |
aw = await anext(aws) if is_async else next(aws) | |
except StopAsyncIteration if is_async else StopIteration: | |
aws_ended = True | |
else: | |
pending.add(asyncio.ensure_future(aw)) | |
if not pending: | |
return | |
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) | |
while done: | |
yield done.pop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment