Skip to content

Instantly share code, notes, and snippets.

@fbparis
Last active June 2, 2020 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fbparis/45eb7bb867ed7aef3c2c4fe6075addb7 to your computer and use it in GitHub Desktop.
Save fbparis/45eb7bb867ed7aef3c2c4fe6075addb7 to your computer and use it in GitHub Desktop.
extract text content from a source
# -*- coding: utf-8 -*-
"""
New kind of text extractor:
Phase 1:
- remove scripts css svg comments canvas etc
- add space after each html tag
- strip tags
- normalize spaces/newlines
- keep the result
Phase 2:
- do x times:
- sample n pages from N
- store exact matches with more than y tokens in bad_sequences
Phase 3:
- for each page:
- remove everything that is in bad_sequences
- keep the rest
TODO:
add some document from a different source, and everything that match won't be removed
"""
__title__ = 'extractor'
__author__ = 'fboudot@pm.me'
__license__ = 'IV'
version_info = (0, 0, 1)
__version__ = '.'.join(map(str, version_info))
import sys
MIN_PYTHON = (3, 7, 1)
assert sys.version_info >= MIN_PYTHON, f"requires Python {'.'.join([str(n) for n in MIN_PYTHON])} or newer"
import logging
import pickle
import re
import requests
import time
from collections import defaultdict
from difflib import SequenceMatcher
from itertools import combinations
from random import sample
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
logger = logging.getLogger('%s.%s' % (__title__, __name__))
logger.setLevel(level=logging.INFO)
URLS_IN, URLS_OUT = [], []
CACHE_IN, CACHE_OUT = [], []
def timeit(func):
def timed(*args, **kwargs):
ts = time.time()
r = func(*args, **kwargs)
te = time.time()
logger.info('%s took %.4fs', func.__name__, te - ts)
return r
return timed
def download_html():
logger.info('downloading html from sources...')
for url in URLS_IN:
r = requests.get(url)
CACHE_IN.append(r.text)
for url in URLS_OUT:
r = requests.get(url)
CACHE_OUT.append(r.text)
def test(patterns_file=None, remove_tags=None):
logger.info('learning patterns from sources...')
patterns = learn_patterns_from_sources([html for html in CACHE_IN], [html for html in CACHE_OUT], remove_tags=remove_tags)
if patterns_file is not None:
logger.info(f'saving patterns in {patterns_file}...')
try:
with open(patterns_file, 'wb') as f:
pickle.dump(patterns, f)
except Exception as err:
logger.error(f'unable to save patterns to {patterns_file}: {err}')
logger.info('extracting text from sources...')
return patterns, [text_from_html(html, patterns) for html in CACHE_IN]
def is_html_tag(word):
return re.fullmatch(r'<\w+>', word) is not None
@timeit
def learn_patterns_from_sources(html_in_source, html_outside_source=[], sample_max_size=30, remove_tags=None):
"""
html_in_source :: [raw_html, ...] :: raw html strings from source to scrap
html_outside_source :: [raw_html, ...] :: raw html strings from another sources
"""
n_samples = min(len(html_in_source), sample_max_size)
indexes = sample(range(len(html_in_source)), n_samples)
texts = [clean_html(html_in_source[i], remove_tags).split(' ') for i in indexes]
matches = defaultdict(int)
N = n_samples * (n_samples - 1) / 2
n = N - 1
logger.info(f'{N} combinations of documents to be compared')
for i, j in combinations(indexes, 2):
s = SequenceMatcher(None, texts[i], texts[j], autojunk=False)
matches_ = defaultdict(int)
for m in s.get_matching_blocks():
if m.size > 0:
matches_[tuple(texts[i][m.a:m.a + m.size])] += 1
for k, count in matches_.items():
k_len = len([w for w in k if not is_html_tag(w)])
if k_len >= count:
matches[k] += min(n, k_len)
logger.debug(f'{len(matches)} patterns to test, those with score >= {N} will be accepted:')
for k, count in matches.items():
logger.debug(f' - {len(k)} words, score={count}, accepted={count >= N}')
logger.debug(summary(str(k)))
patterns = sorted([(list(k), len(k)) for k, count in matches.items() if count >= N], key=lambda x: -x[1])
logger.info(f'{len(patterns)} patterns have been found')
if patterns:
n_samples = min(len(html_outside_source), sample_max_size)
texts = [clean_html(text).split(' ') for text in sample(html_outside_source, n_samples)]
for text in texts:
i, text_len = 0, len(text)
while i < text_len:
pattern_to_remove = None
word = text[i]
for j, (pattern, pattern_len) in enumerate(patterns):
if word == pattern[0] and text[i:i+pattern_len] == pattern:
pattern_to_remove = j
i += pattern_len
break
if pattern_to_remove is not None:
del(patterns[pattern_to_remove])
logger.info(f'pattern {pattern_to_remove} of len {pattern_len} removed because it matched an outside source')
logger.debug(summary(pattern))
continue
i += 1
logger.debug(f'{len(patterns)} patterns accepted:')
for pattern, pattern_len in patterns:
logger.debug(f' - {summary(str(pattern))} (len={pattern_len})')
return patterns
@timeit
def text_from_html(html, patterns, remove_tags=['nav', 'footer']):
text = clean_html(html, remove_tags=remove_tags).split(' ')
i, text_len = 0, len(text)
new_text = []
while i < text_len:
removed_sequence = None
word = text[i]
for pattern, pattern_len in patterns:
if word == pattern[0] and text[i:i+pattern_len] == pattern:
removed_sequence = pattern
i += pattern_len
break
if removed_sequence is not None:
if len(new_text) and '\n' in removed_sequence:
if new_text[-1] != '\n':
new_text.append('\n')
continue
new_text.append(word)
i += 1
return normalize_spaces(' '.join([w for w in new_text if not is_html_tag(w)]))
def summary(text, maxlen=100, separator=' [...] '):
line = re.sub(r'\n+', '', text)
maxlen -= len(separator)
if len(line) <= maxlen:
return line
else:
return line[:maxlen // 2] + separator + line[-maxlen // 2:]
def clean_tag(match):
paragraph_tags = set(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'article', 'blockquote', 'aside', 'caption', 'pre', 'code', 'center', 'header', 'footer', 'main', 'section', 'summary', 'table', 'tr', 'tbody', 'tfoot', 'thead', 'label', 'figcaption', 'button', 'nav'])
tag = re.sub(r'\W+', '', match.group(2)).lower()
if match.group(1) == '/' and tag in paragraph_tags:
return f' <{tag}> \n '
return f' <{tag}> '
@timeit
def clean_html(html, remove_tags=None):
tags_to_decompose = set(['script', 'style', 'canvas', 'svg', 'video', 'audio', 'object', 'applet', 'noframes', 'noscript', 'iframe'])
if remove_tags is not None:
tags_to_decompose |= set(remove_tags)
# keep only body
html = re.sub(r'^.*?<\s*body.*?>+', '<body>', html, flags=re.I+re.S)
logger.debug('**** REMOVING ALL BEFORE <BODY> ****')
logger.debug(summary(html))
# remove html comments <!-- -->
html = re.sub(r'<!--.*?-->', ' ', html, flags=re.S)
logger.debug('**** REMOVING COMMENTS ****')
logger.debug(summary(html))
# remove script, style, canvas, svg, video, audio, object, applet, noframes, noscript and their content
for tag in tags_to_decompose:
html = re.sub(rf'<+\s*{tag}.*?>+.*?(<+\s*/\s*{tag}\s*>+)', ' \n ', html, flags=re.I+re.S)
logger.debug(f'**** REMOVING <{tag}>...</{tag} ****')
logger.debug(summary(html))
# normalize html tags
html = re.sub(r'<+\s*(/?)\s*(\w+).*?>+', clean_tag, html, flags=re.S)
logger.debug('**** NORMALIZING HTML TAGS ****')
logger.debug(summary(html))
# normalize spaces and newlines
return normalize_spaces(html)
def normalize_spaces(text):
"""
trim text
multi spaces are replaced by: one newline if newline in the spaces sequence, space otherwise
"""
text = re.sub(r'\s*\n\s*', ' \n ', text.strip())
parts = re.split(r'(\s+)', text)
for i in range(1, len(parts), 2):
if parts[i] != ' \n ':
parts[i] = ' '
return ''.join(parts)
def main(args=None):
"""
"""
global CACHE_IN, CACHE_OUT
patterns_file = f'{__file__}.patterns.pickle'
remove_tags = ['nav', 'footer']
cache_loaded = False
try:
logger.info('loading data...')
with open(args.datafile, 'rb') as f:
CACHE_IN, CACHE_OUT = pickle.load(f)
except Exception as err:
logger.warning('unable to load data: %s', err)
download_html()
if len(CACHE_IN) or len(CACHE_OUT):
try:
with open(args.datafile, 'wb') as f:
pickle.dump((CACHE_IN, CACHE_OUT), f)
except Exception as err:
logger.error(f'unable to save cached data: {err}')
else:
cache_loaded = True
else:
cache_loaded = True
finally:
logger.debug('len(CACHE_IN)=%d, len(CACHE_OUT)=%d', len(CACHE_IN), len(CACHE_OUT))
if args.html:
logger.setLevel(logging.DEBUG)
for i, cleaned_html in enumerate([clean_html(html) for html in CACHE_OUT]):
print(f'---------- CLEANED HTML {i} ----------')
print(cleaned_html)
print('--' * 50)
elif args.addin or args.addout:
items_added = 0
if args.addin:
try:
r = requests.get(args.addin)
except Exception as err:
logger.error(f'unable to retrieve html: {err}')
else:
if r.text not in CACHE_IN:
CACHE_IN.append(r.text)
items_added += 1
else:
logger.warning(f'{args.addin} is already cached')
if args.addout:
try:
r = requests.get(args.addout)
except Exception as err:
logger.error(f'unable to retrieve html: {err}')
else:
if r.text not in CACHE_OUT:
CACHE_OUT.append(r.text)
items_added += 1
else:
logger.warning(f'{args.addout} is already cached')
if items_added:
try:
with open(args.datafile, 'wb') as f:
pickle.dump((CACHE_IN, CACHE_OUT), f)
except Exception as err:
logger.error(f'unable to save cached data: {err}')
else:
logger.info('cached data saved')
elif args.test:
try:
r = requests.get(args.test)
except Exception as err:
logger.error(f'unable to download test url: {err}')
else:
try:
with open(patterns_file, 'rb') as f:
patterns = pickle.load(f)
except Exception as err:
logger.error(f'unable to load patterns from {patterns_file}: {err}')
logger.info('regenerating patterns from known sources...')
patterns = learn_patterns_from_sources([html for html in CACHE_IN], [html for html in CACHE_OUT], remove_tags=remove_tags)
logger.info(f'extracting text from {r.url}...')
res = text_from_html(r.text, patterns)
logger.info('done')
print('---------- EXTRACTED TEXT ----------')
print(res)
print('--' * 50)
else:
pat, ret = test(patterns_file, remove_tags=remove_tags)
print('*** PATTERNS ***')
for pattern, pattern_len in pat:
print(f" pattern with len {pattern_len}: {pattern}")
print(f'{len(pat)} patterns')
print('*** TEXTS EXTRACTED ***')
for i, text in enumerate(ret):
print(f'--------- TEXT {i} ----------')
print(text)
print('--' * 50)
if __name__ == '__main__':
import argparse
parser=argparse.ArgumentParser()
parser.add_argument('datafile', type=str, help='file used to store and retrieve raw html downloaded from sources')
parser.add_argument('--addin', type=str, help='add an url to download in the source list')
parser.add_argument('--addout', type=str, help='add an url to download in the outside source list')
parser.add_argument('--test', type=str, help='test content extraction for a new url of the source site')
parser.add_argument('--debug', '-d', action='store_true', help='verbose mode')
parser.add_argument('--html', action='store_true', help='check html of outside sources')
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment