Skip to content

Instantly share code, notes, and snippets.

@marklit
Created February 25, 2017 19:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marklit/82a1676a069e855e6f172a881ff616a5 to your computer and use it in GitHub Desktop.
Save marklit/82a1676a069e855e6f172a881ff616a5 to your computer and use it in GitHub Desktop.
CommonCrawl / AWS EMR / MRJob
git clone https://github.com/Smerity/cc-mrjob.git ~/cc-mrjob && cd ~/cc-mrjob
pip install -r requirements.txt
pip install awscli
aws configure
curl -O https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-04/wat.paths.gz
gunzip wat.paths.gz
python server_analysis.py \
-r emr \
--conf-path mrjob.conf \
--no-output \
--output-dir s3://cc-mr-out/ \
wat.paths
runners:
emr:
region: us-east-1
master_instance_type: m3.xlarge
master_instance_bid_price: '0.07'
instance_type: m3.xlarge
num_core_instances: 1
num_task_instances: 1
core_instance_bid_price: '0.07'
task_instance_bid_price: '0.07'
image_version: 4.8.2
interpreter: python2.7
bootstrap:
- sudo pip install boto mrjob simplejson warc
- sudo pip install https://github.com/commoncrawl/gzipstream/archive/master.zip
import gzip
import json
import boto
from boto.s3.key import Key
from gzipstream import GzipStreamFile
from mrjob.job import MRJob
import warc
class CCJob(MRJob):
def process_record(self, record):
"""
Override process_record with your mapper
"""
raise NotImplementedError('Process record needs to be customized')
def mapper(self, _, line):
f = None
## If we're on EC2 or running on a Hadoop cluster, pull files via S3
if self.options.runner in ['emr', 'hadoop']:
# Connect to Amazon S3 using anonymous credentials
conn = boto.connect_s3(anon=True)
pds = conn.get_bucket('aws-publicdatasets')
# Start a connection to one of the WARC files
k = Key(pds, line)
f = warc.WARCFile(fileobj=GzipStreamFile(k))
## If we're local, use files on the local file system
else:
print 'Loading local file {}'.format(line)
f = warc.WARCFile(fileobj=gzip.open(line))
###
for i, record in enumerate(f):
for key, value in self.process_record(record):
yield key, value
self.increment_counter('commoncrawl', 'processed_records', 1)
def combiner(self, key, value):
yield key, sum(value)
def reducer(self, key, value):
yield key, sum(value)
class ServerAnalysis(CCJob):
def process_record(self, record):
# We're only interested in the JSON responses
if record['Content-Type'] != 'application/json':
return
payload = record.payload.read()
data = json.loads(payload)
# Only interested in 'response, skip the 'metadata' and 'request' entries
if data['Envelope']['WARC-Header-Metadata']['WARC-Type'] != 'response':
return
###
try:
server = data['Envelope']['Payload-Metadata']['HTTP-Response-Metadata']['Headers']['Server']
yield server, 1
self.increment_counter('commoncrawl', 'process_server_headers', 1)
except KeyError:
pass
self.increment_counter('commoncrawl', 'processed_pages', 1)
if __name__ == '__main__':
ServerAnalysis.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment