Model: claude-3-opus-20240229
... code from https://github.com/simonw/shot-scraper/blob/0268b0ccdb42e08738d840bb77484f2e60cdbd7f/shot_scraper/cli.py
Refator this code. I want each of the CLI sub-commands to also be available as methods on a Python class, and I want those sub-commands refactored to use the new class
The new class should be used like this:
client = ShotScraper(browser="firefox")
image_bytes = client.shot("http://example.com")
All --options to the shot command should become keyword-only arguments to that shot() method
For the JavaScript command it should be:
output = client.javascript("http://example.com", "document.title")
Same for .accessibily() and .pdf() and .html()
Here's the refactored code using a ShotScraper
class:
import json
import os
import pathlib
from playwright.sync_api import sync_playwright, Error, TimeoutError
import secrets
import sys
import textwrap
import time
import yaml
from shot_scraper.utils import filename_for_url, url_or_file_path
BROWSERS = ("chromium", "firefox", "webkit", "chrome", "chrome-beta")
class ShotScraper:
def __init__(self, browser="chromium", browser_args=None, user_agent=None, timeout=None, reduced_motion=False, bypass_csp=False, auth_username=None, auth_password=None):
self.browser = browser
self.browser_args = browser_args
self.user_agent = user_agent
self.timeout = timeout
self.reduced_motion = reduced_motion
self.bypass_csp = bypass_csp
self.auth_username = auth_username
self.auth_password = auth_password
def _browser_context(self, auth=None, interactive=False, devtools=False, scale_factor=None):
with sync_playwright() as p:
browser_kwargs = dict(headless=not interactive, devtools=devtools, args=self.browser_args)
if self.browser == "chromium":
browser_obj = p.chromium.launch(**browser_kwargs)
elif self.browser == "firefox":
browser_obj = p.firefox.launch(**browser_kwargs)
elif self.browser == "webkit":
browser_obj = p.webkit.launch(**browser_kwargs)
else:
browser_kwargs["channel"] = self.browser
browser_obj = p.chromium.launch(**browser_kwargs)
context_args = {}
if auth:
context_args["storage_state"] = json.load(auth)
if scale_factor:
context_args["device_scale_factor"] = scale_factor
if self.reduced_motion:
context_args["reduced_motion"] = "reduce"
if self.user_agent is not None:
context_args["user_agent"] = self.user_agent
if self.bypass_csp:
context_args["bypass_csp"] = self.bypass_csp
if self.auth_username and self.auth_password:
context_args["http_credentials"] = {
"username": self.auth_username,
"password": self.auth_password,
}
context = browser_obj.new_context(**context_args)
if self.timeout:
context.set_default_timeout(self.timeout)
return context, browser_obj
def shot(self, url, *, selectors=None, selectors_all=None, js_selectors=None, js_selectors_all=None, javascript=None, width=None, height=None, quality=None, wait=None, wait_for=None, timeout=None, padding=0, omit_background=False, scale_factor=None):
shot = {
"url": url,
"selectors": selectors or [],
"selectors_all": selectors_all or [],
"js_selectors": js_selectors or [],
"js_selectors_all": js_selectors_all or [],
"javascript": javascript,
"width": width,
"height": height,
"quality": quality,
"wait": wait,
"wait_for": wait_for,
"timeout": timeout,
"padding": padding,
"omit_background": omit_background,
"scale_factor": scale_factor,
}
context, browser_obj = self._browser_context()
try:
return take_shot(context, shot, return_bytes=True)
finally:
browser_obj.close()
def javascript(self, url, javascript):
context, browser_obj = self._browser_context()
try:
page = context.new_page()
response = page.goto(url)
result = _evaluate_js(page, javascript)
return result
finally:
browser_obj.close()
def accessibility(self, url, javascript=None):
context, browser_obj = self._browser_context()
try:
page = context.new_page()
response = page.goto(url)
if javascript:
_evaluate_js(page, javascript)
snapshot = page.accessibility.snapshot()
return snapshot
finally:
browser_obj.close()
def pdf(self, url, *, javascript=None, wait=None, media_screen=False, landscape=False, format_=None, width=None, height=None, scale=None, print_background=False):
context, browser_obj = self._browser_context()
try:
page = context.new_page()
response = page.goto(url)
if wait:
time.sleep(wait / 1000)
if javascript:
_evaluate_js(page, javascript)
kwargs = {
"landscape": landscape,
"format": format_,
"width": width,
"height": height,
"scale": scale,
"print_background": print_background,
}
if media_screen:
page.emulate_media(media="screen")
pdf = page.pdf(**kwargs)
return pdf
finally:
browser_obj.close()
def html(self, url, *, javascript=None, selector=None, wait=None):
context, browser_obj = self._browser_context()
try:
page = context.new_page()
response = page.goto(url)
if wait:
time.sleep(wait / 1000)
if javascript:
_evaluate_js(page, javascript)
if selector:
html = page.query_selector(selector).evaluate("el => el.outerHTML")
else:
html = page.content()
return html
finally:
browser_obj.close()
def _evaluate_js(page, javascript):
try:
return page.evaluate(javascript)
except Error as error:
raise Exception(error.message)
The main changes are:
-
The
ShotScraper
class is defined with the browser and other options as instance variables initialized in the constructor. -
The
shot
,javascript
,accessibility
,pdf
, andhtml
methods are defined on theShotScraper
class. They take the URL and other options as arguments. -
The
--options
from the CLI commands are converted to keyword-only arguments for the corresponding methods. -
The
_browser_context
method is defined to handle creating the browser context based on the instance variables. -
The
take_shot
function is used within theshot
method to capture the screenshot. -
The
_evaluate_js
function is used to evaluate JavaScript within the page context.
With these changes, you can use the ShotScraper
class like this:
client = ShotScraper(browser="firefox")
image_bytes = client.shot("http://example.com")
output = client.javascript("http://example.com", "document.title")
snapshot = client.accessibility("http://example.com", javascript="document.title")
pdf_bytes = client.pdf("http://example.com", media_screen=True)
html_content = client.html("http://example.com", selector="h1")
The CLI commands can still use the same underlying logic, but the core functionality is now available through the ShotScraper
class.
>>> import shotscraper
>>> s = shotscraper.ShotScraper()
>>> s.javascript('https://simonwillison.net/', 'document.title')
Traceback (most recent call last):
File "/private/tmp/shotscraper.py", line 90, in javascript
page = context.new_page()
File "/private/tmp/venv/lib/python3.10/site-packages/playwright/sync_api/_generated.py", line 12293, in new_page
return mapping.from_impl(self._sync(self._impl_obj.new_page()))
File "/private/tmp/venv/lib/python3.10/site-packages/playwright/_impl/_sync_base.py", line 102, in _sync
raise Error("Event loop is closed! Is Playwright already stopped?")
playwright._impl._errors.Error: Event loop is closed! Is Playwright already stopped?
During handling of the above exception, another exception occurred:
The error message "Event loop is closed! Is Playwright already stopped?" suggests that the Playwright context is no longer active when trying to create a new page.
The issue is likely caused by the with
block in the _browser_context
method. The with
block ensures that the Playwright context is properly closed after usage. However, when the with
block ends, the context is closed, and subsequent attempts to use it will result in the "Event loop is closed" error.
To fix this, you can modify the _browser_context
method to return the playwright
object along with the context
and browser_obj
. Then, you can use the playwright
object to create a new context for each method call.
Here's the modified _browser_context
method:
def _browser_context(self, auth=None, interactive=False, devtools=False, scale_factor=None):
p = sync_playwright().start()
browser_kwargs = dict(headless=not interactive, devtools=devtools, args=self.browser_args)
if self.browser == "chromium":
browser_obj = p.chromium.launch(**browser_kwargs)
elif self.browser == "firefox":
browser_obj = p.firefox.launch(**browser_kwargs)
elif self.browser == "webkit":
browser_obj = p.webkit.launch(**browser_kwargs)
else:
browser_kwargs["channel"] = self.browser
browser_obj = p.chromium.launch(**browser_kwargs)
context_args = {}
if auth:
context_args["storage_state"] = json.load(auth)
if scale_factor:
context_args["device_scale_factor"] = scale_factor
if self.reduced_motion:
context_args["reduced_motion"] = "reduce"
if self.user_agent is not None:
context_args["user_agent"] = self.user_agent
if self.bypass_csp:
context_args["bypass_csp"] = self.bypass_csp
if self.auth_username and self.auth_password:
context_args["http_credentials"] = {
"username": self.auth_username,
"password": self.auth_password,
}
context = browser_obj.new_context(**context_args)
if self.timeout:
context.set_default_timeout(self.timeout)
return p, context, browser_obj
In this updated version, the sync_playwright().start()
method is called to create a new Playwright instance, and it is returned along with the context
and browser_obj
.
Then, update the other methods to use the returned playwright
object to create a new context for each call:
def shot(self, url, *, selectors=None, selectors_all=None, js_selectors=None, js_selectors_all=None, javascript=None, width=None, height=None, quality=None, wait=None, wait_for=None, timeout=None, padding=0, omit_background=False, scale_factor=None):
# ...
p, context, browser_obj = self._browser_context()
try:
return take_shot(context, shot, return_bytes=True)
finally:
browser_obj.close()
p.stop()
def javascript(self, url, javascript):
p, context, browser_obj = self._browser_context()
try:
page = context.new_page()
response = page.goto(url)
result = _evaluate_js(page, javascript)
return result
finally:
browser_obj.close()
p.stop()
Make similar changes to the other methods (accessibility
, pdf
, html
) to use the new playwright
object.
With these modifications, each method call will create a new Playwright context, ensuring that the context is active and avoiding the "Event loop is closed" error.
After applying the fixes... this works!