Skip to content

Instantly share code, notes, and snippets.

@ptgolden
Last active November 24, 2015 16:12
Show Gist options
  • Save ptgolden/be1da65b013c13c3acdd to your computer and use it in GitHub Desktop.
Save ptgolden/be1da65b013c13c3acdd to your computer and use it in GitHub Desktop.
Python script to crawl a W3 mailing list and add it to an mbox file.
#!/usr/bin/python3
import html
import mailbox
import re
import sys
import time
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from bs4.element import Comment
HELP_TEXT = """
Crawl a month of a W3 mailing list.
Usage: ./crawl.py <month_page> (<mbox outfile>)
If an mbox outfile is not provided, one will be created in the working
directory named after the the list's associated group.
Example: ./crawl.py https://lists.w3.org/Archives/Public/www-tag/2005Mar/
"""
def main():
if len(sys.argv) == 1 or len(sys.argv) > 3 or sys.argv[1] == '-h':
print(HELP_TEXT)
return
url = sys.argv[1]
pattern = ('https://lists.w3.org/.*/'
'(?P<group_name>[^/]+)/'
'(?P<time>\d{4}\w{3})/')
match = re.match(pattern, url)
if not match:
raise ValueError(
'This is not a valid URL for a monthly archive. '
'It should look something like like:\n '
'https://lists.w3.org/Archives/Public/mygroup/2005Mar/')
mbox_filename = (
sys.argv[2] if len(sys.argv) == 3
else '{group_name}.mbox'.format(**match.groupdict()))
mbox = mailbox.mbox(mbox_filename)
mbox.lock()
try:
while True:
crawl_month(url, mbox)
month = re.match(pattern, url).groupdict()['time']
next_month = get_next_month(month)
confirm_next = input('Crawl next month ({})? '.format(next_month))
if confirm_next == 'y':
url = url.replace(month, next_month)
else:
break
except requests.RequestException:
print('Connection error while trying to fetch {}.'.format(url))
except KeyboardInterrupt:
print('Stopping...')
finally:
mbox.close()
def get_next_month(timestr):
MONTHS = ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec')
year = int(timestr[:4])
month = timestr[4:]
next_month = MONTHS[(MONTHS.index(month) + 1) % 12]
if next_month == 'Jan':
year += 1
return str(year) + next_month
def crawl_month(month_url, mbox):
"""
Print the given URL as an mbox message.
"""
month_req = requests.get(month_url)
month_soup = BeautifulSoup(month_req.content)
all_message_urls = {
urljoin(month_url, message['href'])
for message in month_soup.select('.messages-list li a')
if 'href' in message.attrs and message.text
}
urls_to_process = sorted(all_message_urls.difference({
mailbox.mboxMessage(msg)['Archived-At'][1:-1] for msg in mbox
}))
if not urls_to_process:
print('All messages for this month already fetched.')
for i, url in enumerate(urls_to_process):
print('Fetching {}/{} ({})'.format(i + 1, len(urls_to_process), url))
try:
message = fetch_message(url)
mbox.add(message)
time.sleep(1)
except Exception as err:
if isinstance(err, requests.RequestException):
raise err
print('\nFailed to fetch {}. Exception was: '.format(url))
print('\n ' + err.__str__() + '\n')
cont = input('Continue anyway (y/n)? ')
if cont != 'y':
raise err
def fetch_message(url):
req = requests.get(url)
soup = BeautifulSoup(req.content)
body = soup.find('pre', attrs={'id': 'body'}).text.strip()
metadata = message_metadata(soup)
message = mailbox.mboxMessage()
message.set_from('{email} {received}'.format(**metadata))
message.set_charset(metadata['charset'])
message.add_header('From', '{name} <{email}>'.format(**metadata))
message.add_header('Message-ID', '<{id}>'.format(**metadata))
message.add_header('Archived-At', '<{}>'.format(url))
message.add_header('Date', metadata['sent'])
message.add_header('Subject', metadata['subject'])
if 'inreplyto' in metadata:
message.add_header('In-Reply-To', '<{inreplyto}>'.format(**metadata))
message.set_payload(body.encode('utf-8'))
return message
def message_metadata(soup):
metadata = soup\
.find('div', attrs={'class': 'head'})\
.find_all(text=lambda t: isinstance(t, Comment))
metadata = (str.strip(val).split('=', 1) for val in metadata)
metadata = dict(((key, val[1:-1]) for key, val in metadata))
metadata['email'] = html.unescape(metadata['email'])
metadata['subject'] = html.unescape(metadata['subject'].replace('\t', ' '))
if 'inreplyto' in metadata:
metadata['inreplyto'] = html.unescape(metadata['inreplyto'])
return metadata
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment