Created
February 25, 2017 19:51
-
-
Save marklit/82a1676a069e855e6f172a881ff616a5 to your computer and use it in GitHub Desktop.
CommonCrawl / AWS EMR / MRJob
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone https://github.com/Smerity/cc-mrjob.git ~/cc-mrjob && cd ~/cc-mrjob | |
pip install -r requirements.txt | |
pip install awscli | |
aws configure | |
curl -O https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-04/wat.paths.gz | |
gunzip wat.paths.gz | |
python server_analysis.py \ | |
-r emr \ | |
--conf-path mrjob.conf \ | |
--no-output \ | |
--output-dir s3://cc-mr-out/ \ | |
wat.paths |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
runners: | |
emr: | |
region: us-east-1 | |
master_instance_type: m3.xlarge | |
master_instance_bid_price: '0.07' | |
instance_type: m3.xlarge | |
num_core_instances: 1 | |
num_task_instances: 1 | |
core_instance_bid_price: '0.07' | |
task_instance_bid_price: '0.07' | |
image_version: 4.8.2 | |
interpreter: python2.7 | |
bootstrap: | |
- sudo pip install boto mrjob simplejson warc | |
- sudo pip install https://github.com/commoncrawl/gzipstream/archive/master.zip |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import json | |
import boto | |
from boto.s3.key import Key | |
from gzipstream import GzipStreamFile | |
from mrjob.job import MRJob | |
import warc | |
class CCJob(MRJob): | |
def process_record(self, record): | |
""" | |
Override process_record with your mapper | |
""" | |
raise NotImplementedError('Process record needs to be customized') | |
def mapper(self, _, line): | |
f = None | |
## If we're on EC2 or running on a Hadoop cluster, pull files via S3 | |
if self.options.runner in ['emr', 'hadoop']: | |
# Connect to Amazon S3 using anonymous credentials | |
conn = boto.connect_s3(anon=True) | |
pds = conn.get_bucket('aws-publicdatasets') | |
# Start a connection to one of the WARC files | |
k = Key(pds, line) | |
f = warc.WARCFile(fileobj=GzipStreamFile(k)) | |
## If we're local, use files on the local file system | |
else: | |
print 'Loading local file {}'.format(line) | |
f = warc.WARCFile(fileobj=gzip.open(line)) | |
### | |
for i, record in enumerate(f): | |
for key, value in self.process_record(record): | |
yield key, value | |
self.increment_counter('commoncrawl', 'processed_records', 1) | |
def combiner(self, key, value): | |
yield key, sum(value) | |
def reducer(self, key, value): | |
yield key, sum(value) | |
class ServerAnalysis(CCJob): | |
def process_record(self, record): | |
# We're only interested in the JSON responses | |
if record['Content-Type'] != 'application/json': | |
return | |
payload = record.payload.read() | |
data = json.loads(payload) | |
# Only interested in 'response, skip the 'metadata' and 'request' entries | |
if data['Envelope']['WARC-Header-Metadata']['WARC-Type'] != 'response': | |
return | |
### | |
try: | |
server = data['Envelope']['Payload-Metadata']['HTTP-Response-Metadata']['Headers']['Server'] | |
yield server, 1 | |
self.increment_counter('commoncrawl', 'process_server_headers', 1) | |
except KeyError: | |
pass | |
self.increment_counter('commoncrawl', 'processed_pages', 1) | |
if __name__ == '__main__': | |
ServerAnalysis.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment