Skip to content

Instantly share code, notes, and snippets.

@jaehunro
Created May 22, 2020 18:33
Show Gist options
  • Save jaehunro/a81fdb4b181d139cb6b731e57782c797 to your computer and use it in GitHub Desktop.
Save jaehunro/a81fdb4b181d139cb6b731e57782c797 to your computer and use it in GitHub Desktop.
"""Match URLs from WARC records according to regular expression.
WARC records are those that match a SQL query on the columnar URL index.
"""
import re
from bs4 import BeautifulSoup
from bs4.dammit import EncodingDetector
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from sparkcc import CCIndexWarcSparkJob
class CCIndexUrlMatchJob(CCIndexWarcSparkJob):
"""Url match regular expression."""
name = 'CCIndexUrlMatchJob'
output_schema = StructType([StructField('key', StringType(), True)])
records_parsing_failed = None
records_non_html = None
url_pattern = re.compile(
r'(?:https?|ftp)://(?:www\.)?[A-Z][a-z]*(?:[A-Z][a-z]*)+\.'
'(?:aero|asia|biz|cat|com|coop|edu|gov|info|int|jobs|mil|mobi|'
'museum|name|net|org|pro|tel|travel|ac|ad|ae|af|ag|ai|al|am|'
'an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|'
'bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|'
'co|cr|cu|cv|cx|cy|cz|cz|de|dj|dk|dm|do|dz|ec|ee|eg|er|es|et|'
'eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|'
'gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|'
'is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|'
'li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mn|mn|mo|mp|'
'mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|'
'nz|nom|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ra|rs|'
'ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|sj|sk|sl|sm|sn|so|sr|st|su|'
'sv|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|'
'ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|'
'zw|arpa)')
def init_accumulators(self, sc):
super(CCIndexUrlMatchJob, self).init_accumulators(sc)
self.records_parsing_failed = sc.accumulator(0)
self.records_non_html = sc.accumulator(0)
def log_aggregators(self, sc):
super(CCIndexUrlMatchJob, self).log_aggregators(sc)
self.log_aggregator(sc, self.records_parsing_failed,
'records failed to parse = {}')
self.log_aggregator(sc, self.records_non_html,
'records not HTML = {}')
def html_to_text(self, page, record):
try:
encoding = EncodingDetector.find_declared_encoding(page, is_html=True)
soup = BeautifulSoup(page, 'lxml', from_encoding=encoding)
for script in soup(['script', 'style']):
script.extract()
return soup.get_text(' ', strip=True)
except:
self.records_parsing_failed.add(1)
return ''
def process_record(self, record):
page = record.content_stream().read()
if not self.is_html(record):
self.records_non_html.add(1)
return
text = self.html_to_text(page, record)
for url in CCIndexUrlMatchJob.url_pattern.findall(text):
yield (url,)
def load_dataframe(self, sc, partitions=-1):
session = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()
if self.args.query is not None:
self.load_table(sc, session, self.args.input, self.args.table)
sqldf = self.execute_query(sc, session, self.args.query)
else:
sqldf = (session.read
.format('csv')
.option('header', True)
.option('inferSchema', True)
.load(self.args.csv))
if partitions > 0:
self.get_logger(sc).info(
'Repartitioning data to {} partitions'.format(partitions))
sqldf = sqldf.repartition(partitions)
return sqldf
def run_job(self, sc, sqlc):
sqldf = self.load_dataframe(sc, self.args.num_input_partitions)
warc_recs = sqldf.select('url', 'warc_filename', 'warc_record_offset',
'warc_record_length').rdd
output = (warc_recs.mapPartitions(self.fetch_process_warc_records)
.distinct())
(sqlc.createDataFrame(output, schema=self.output_schema)
.coalesce(self.args.num_output_partitions)
.write
.format(self.args.output_format)
.option('compression', self.args.output_compression)
.options(**self.get_output_options())
.saveAsTable(self.args.output))
self.log_aggregators(sc)
if __name__ == '__main__':
job = CCIndexUrlMatchJob()
job.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment