Skip to content

Instantly share code, notes, and snippets.

@alexbecker
Last active January 8, 2017 00:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alexbecker/22d028961b23bbc9781e8b90fbccb085 to your computer and use it in GitHub Desktop.
Save alexbecker/22d028961b23bbc9781e8b90fbccb085 to your computer and use it in GitHub Desktop.
import hanzo.warctools.record as r
from http.client import HTTPResponse
from io import BytesIO
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from collections import defaultdict
class FakeSocket():
def __init__(self, response_str):
self._file = BytesIO(response_str)
def makefile(self, *args, **kwargs):
return self._file
class ResponseRecord():
def __init__(self, warc_record):
self.warc_headers = dict(warc_record.headers)
self.url = urlparse(self.warc_headers[b'WARC-Target-URI'].decode('utf-8'))
content = warc_record.content[1]
self._response = HTTPResponse(FakeSocket(content))
self._response.begin()
self.headers = dict(self._response.getheaders())
self.html = self._response.read()
self._soup = None
@property
def soup(self):
if self._soup is None:
self._soup = BeautifulSoup(self.html, 'html.parser')
return self._soup
@property
def div_content(self):
return self.soup.find(id="content")
def read_file(filepath):
records = []
oa = r.ArchiveRecord.open_archive(filepath)
while True:
try:
warc_record = next(oa.read_records())[1]
if warc_record is None:
break
except StopIteration:
break
try:
records.append(ResponseRecord(warc_record))
except Exception as e:
print('Exception: {}'.format(e))
return records
def dedup_records(records):
result = defaultdict(lambda: [])
for r in records:
result[r.url.path].append(r)
for l in result.values():
# sort by date, most recent first
l.sort(key=lambda r: r.warc_headers[b'WARC-Date'], reverse=True)
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment