Created
May 22, 2020 18:33
-
-
Save jaehunro/a81fdb4b181d139cb6b731e57782c797 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Match URLs from WARC records according to regular expression. | |
WARC records are those that match a SQL query on the columnar URL index. | |
""" | |
import re | |
from bs4 import BeautifulSoup | |
from bs4.dammit import EncodingDetector | |
from pyspark.sql import SparkSession | |
from pyspark.sql.types import StructType, StructField, StringType | |
from sparkcc import CCIndexWarcSparkJob | |
class CCIndexUrlMatchJob(CCIndexWarcSparkJob): | |
"""Url match regular expression.""" | |
name = 'CCIndexUrlMatchJob' | |
output_schema = StructType([StructField('key', StringType(), True)]) | |
records_parsing_failed = None | |
records_non_html = None | |
url_pattern = re.compile( | |
r'(?:https?|ftp)://(?:www\.)?[A-Z][a-z]*(?:[A-Z][a-z]*)+\.' | |
'(?:aero|asia|biz|cat|com|coop|edu|gov|info|int|jobs|mil|mobi|' | |
'museum|name|net|org|pro|tel|travel|ac|ad|ae|af|ag|ai|al|am|' | |
'an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|' | |
'bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|' | |
'co|cr|cu|cv|cx|cy|cz|cz|de|dj|dk|dm|do|dz|ec|ee|eg|er|es|et|' | |
'eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|' | |
'gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|' | |
'is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|' | |
'li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mn|mn|mo|mp|' | |
'mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|' | |
'nz|nom|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ra|rs|' | |
'ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|sj|sk|sl|sm|sn|so|sr|st|su|' | |
'sv|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|' | |
'ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|' | |
'zw|arpa)') | |
def init_accumulators(self, sc): | |
super(CCIndexUrlMatchJob, self).init_accumulators(sc) | |
self.records_parsing_failed = sc.accumulator(0) | |
self.records_non_html = sc.accumulator(0) | |
def log_aggregators(self, sc): | |
super(CCIndexUrlMatchJob, self).log_aggregators(sc) | |
self.log_aggregator(sc, self.records_parsing_failed, | |
'records failed to parse = {}') | |
self.log_aggregator(sc, self.records_non_html, | |
'records not HTML = {}') | |
def html_to_text(self, page, record): | |
try: | |
encoding = EncodingDetector.find_declared_encoding(page, is_html=True) | |
soup = BeautifulSoup(page, 'lxml', from_encoding=encoding) | |
for script in soup(['script', 'style']): | |
script.extract() | |
return soup.get_text(' ', strip=True) | |
except: | |
self.records_parsing_failed.add(1) | |
return '' | |
def process_record(self, record): | |
page = record.content_stream().read() | |
if not self.is_html(record): | |
self.records_non_html.add(1) | |
return | |
text = self.html_to_text(page, record) | |
for url in CCIndexUrlMatchJob.url_pattern.findall(text): | |
yield (url,) | |
def load_dataframe(self, sc, partitions=-1): | |
session = SparkSession.builder.config(conf=sc.getConf()).getOrCreate() | |
if self.args.query is not None: | |
self.load_table(sc, session, self.args.input, self.args.table) | |
sqldf = self.execute_query(sc, session, self.args.query) | |
else: | |
sqldf = (session.read | |
.format('csv') | |
.option('header', True) | |
.option('inferSchema', True) | |
.load(self.args.csv)) | |
if partitions > 0: | |
self.get_logger(sc).info( | |
'Repartitioning data to {} partitions'.format(partitions)) | |
sqldf = sqldf.repartition(partitions) | |
return sqldf | |
def run_job(self, sc, sqlc): | |
sqldf = self.load_dataframe(sc, self.args.num_input_partitions) | |
warc_recs = sqldf.select('url', 'warc_filename', 'warc_record_offset', | |
'warc_record_length').rdd | |
output = (warc_recs.mapPartitions(self.fetch_process_warc_records) | |
.distinct()) | |
(sqlc.createDataFrame(output, schema=self.output_schema) | |
.coalesce(self.args.num_output_partitions) | |
.write | |
.format(self.args.output_format) | |
.option('compression', self.args.output_compression) | |
.options(**self.get_output_options()) | |
.saveAsTable(self.args.output)) | |
self.log_aggregators(sc) | |
if __name__ == '__main__': | |
job = CCIndexUrlMatchJob() | |
job.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment