Instantly share code, notes, and snippets.

Embed
What would you like to do?
import re
import sys
from subprocess import Popen, PIPE
from multiprocessing import Pool
PDS_BASE = 'http://aws-publicdatasets.s3.amazonaws.com/'
WARC_PATHS = {
'2014-12': 'common-crawl/crawl-data/CC-MAIN-2014-52/warc.paths.gz',
'2015-07': 'common-crawl/crawl-data/CC-MAIN-2015-32/warc.paths.gz'
}
RUNNING = 'running'
FINISHED = 'finished'
GOOGLE = 'google'
def s3_url(path, base=PDS_BASE):
return base + path
def download_gz(url):
curl = Popen(['curl', '-s', url], stdout=PIPE)
gunzip = Popen(['gunzip'], stdin=curl.stdout, stdout=PIPE)
curl.stdout.close() # Allow curl to receive a SIGPIPE if gunzip exits.
for line in gunzip.stdout:
yield line
def download_warc_paths(path):
for line in download_gz(s3_url(path)):
yield line.rstrip('\n')
def download_warc(path):
return download_gz(s3_url(path))
def grep_warc(warc):
url_line = None
date_line = None
for line in warc:
if line.startswith('WARC-Target-URI'):
url_line = line
elif line.startswith('WARC-Date'):
date_line = line
elif url_line is not None:
if 'www.google-analytics.com/analytics.js' in line::
# Strip 'WARC-Target-URI: ' and '\r\n'
url = url_line[17:-2]
# Strip 'WARC-Date: ' and '\r\n'
date = date_line[11:-2]
yield GOOGLE, date, url
url_line = None
date_line = None
def chunk_status_path(chunk):
return '{0}.status'.format(chunk)
def update_download_progress(chunk, progress, status):
with open(chunk_status_path(chunk), 'w') as file:
file.write(
'{chunk}\t{progress}\t{status}\n'
.format(
chunk=chunk,
progress=progress,
status=status
)
)
def log_download_progress(chunk, warc, every=1000000):
for progress, line in enumerate(warc):
if progress % every == 0:
update_download_progress(chunk, progress, RUNNING)
yield line
update_download_progress(chunk, progress, FINISHED)
def chunk_results_path(chunk):
return '{0}.output'.format(chunk)
def write_grep_results(chunk, results):
with open(chunk_results_path(chunk), 'w', buffering=1) as file:
for provider, date, url in results:
file.write(
'{chunk}\t{provider}\t{date}\t{url}\n'
.format(
chunk=chunk,
provider=provider,
date=date,
url=url
)
)
def run_grep_warc(chunk, path):
warc = log_download_progress(chunk, download_warc(path))
urls = grep_warc(warc)
write_grep_results(chunk, urls)
def run_parallel_greps(paths, start, stop):
stop = min(stop, len(paths))
processes = stop - start
pool = Pool(processes=processes)
for chunk in range(start, stop):
path = paths[chunk]
pool.apply_async(run_grep_warc, (chunk, path))
pool.close()
pool.join()
if __name__ == '__main__':
args = sys.argv
if len(args) != 4:
sys.exit('Usage: grep period start stop')
_, period, start, stop = args
start = int(start)
stop = int(stop)
path = WARC_PATHS[period]
paths = list(download_warc_paths(path))
run_parallel_greps(paths, start, stop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment