Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Compare keyword extraction results, in French language, from TF/IDF, Yake, KeyBert.
import os
import codecs
import urllib3
import gzip
from pathlib import Path
# CChardet is faster and can be more accurate
try:
import cchardet as chardet
except ImportError:
import chardet
# https://www.crummy.com/software/BeautifulSoup/
from bs4 import BeautifulSoup
"""
Caching content reader that cleanses HTML from web site.
Cache occurs at both the raw HTML level and the cleansed text level,
allowing us to reduce the overhead of calling the internet and/or
re-cleansing content on every run through unless one of those methods
has changed
"""
class CacheableReader(object):
"""
A reader that downloads and cleanses content from the web,
with local caching based on the final segment of the URL
"""
def __init__(self):
# local folder cached text
self.cache_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'content_cache')
Path(self.cache_folder).mkdir(parents=True, exist_ok=True)
def get_site_text(self, url, force=False) -> str:
"""
Provide cleansed text for the provided URL utilizing a read-through local cache_site_text
@param url URL for the remote resource
@param force Force fresh download, bypassing the local cache
@return string Cleansed text content for the provided URL
"""
load_html_fresh = force is True or self._is_site_text_cached(url, 'html') is False
load_text_fresh = force is True or load_html_fresh or self._is_site_text_cached(url, 'txt') is False
if load_html_fresh:
html = self._download_site_html(url)
self._set_cache_site_content(url, html, 'html')
else:
html = self.get_cached_site_content(url, 'html')
if load_text_fresh:
text = self._cleaned_method(html)
self._set_cache_site_content(url, text, 'txt')
else:
text = self.get_cached_site_content(url, 'txt')
return text
def get_cached_site_content(self, url, content_type='txt') -> str:
"""
Returns the cached site text for the given URL from the local cache_folder
@param url remote URL that we want to load cache dtext for
@param content_type Type of content to load from cache (html or txt)
@return string cached site text
"""
cache_filename = self._create_cache_filename(url, content_type)
with codecs.open(cache_filename, 'r', 'utf-8') as cache_file:
return cache_file.read()
def iter_list_txt_file(self):
"""
Find all txt files in cache folder
"""
return (f for f in os.listdir(self.cache_folder) if f.endswith(".txt"))
def _create_cache_filename(self, url, content_type) -> bool:
"""
Generates a cache filename/path for a given cache folder and URL
@param url URL for the content that will be cached
@param content_type type of content (html or txt)
"""
cache_name = Path(url).stem
return os.path.join(self.cache_folder, "%s.%s" % (cache_name, content_type))
def _is_site_text_cached(self, url, content_type) -> bool:
"""
Verifies if provided URL has already been cached in the given cache_folder
@param url Url of content to check in cache for
@param content_type Type of cached content to look for (html, txt)
@return bool Whether the content is cached or not
"""
cache_filename = self._create_cache_filename(url, content_type)
return os.path.isfile(cache_filename) and os.path.getsize(cache_filename) > 0
def _download_site_html(self, url) -> str:
"""
Downloads and cleanses the content of the specified url
@param url URL to download content from
@return string html content of the site
"""
downloaded = self._fetch_url(url)
if downloaded is not None:
return downloaded
else:
return ''
def _set_cache_site_content(self, url, text, content_type):
"""
Cache the provided site text from the given URL to the provided cache_folder
@param url URL the provided text was downloaded from
@param text Cleansed text to cache for the given URL
@param content_type Type of content to cache (html or txt)
"""
cache_filename = self._create_cache_filename(url, content_type)
with codecs.open(cache_filename, 'w', 'utf-8') as cache_file:
cache_file.write(text)
def _cleaned_method(self, html) -> str:
text = ' '.join(BeautifulSoup(html, "html.parser").stripped_strings)
#text = ' '.join(BeautifulSoup(html, "html.parser").body.get_text(separator=' ').split())
return text # .replace("\n", " ").replace("\'", "")
# https://github.com/adbar/trafilatura
# import trafilatura
# text = trafilatura.extract(html)
# https://github.com/weblyzard/inscriptis
# import inscriptis
# text = ' '.join(inscriptis.get_text(html).split())
def _fetch_url(self, url, decode=True):
"""
Fetches page using request and decodes the response.
Args:
url: URL of the page to fetch.
decode: Decode response instead of returning Urllib3 response object (boolean).
Returns:
HTML code as string, or Urllib3 response object (headers + body), or empty string in case
the result is invalid, or None if there was a problem with the network.
"""
timeout = 15
retry_strategy = urllib3.util.Retry(
total=3,
redirect=2,
connect=0,
backoff_factor=timeout * 2,
status_forcelist=[429, 500, 502, 503, 504],
)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
http_pool = urllib3.PoolManager(retries=retry_strategy)
headers = {
'User-Agent': 'Keyword Extraction 1.0',
}
try:
response = http_pool.request('GET', url, headers=headers, timeout=timeout)
except urllib3.exceptions.NewConnectionError as err:
return None
except urllib3.exceptions.MaxRetryError as err:
return None
except urllib3.exceptions.TimeoutError as err:
return None
except Exception as err:
return None
else:
# safety checks
if response.status != 200 or response.data is None:
return None
if decode is True:
return self._decode_response(response.data)
return response
def _decode_response(self, response):
"""
Read the urllib3 object corresponding to the server response,
check if it could be GZip and eventually decompress it, then
try to guess its encoding and decode it to return a unicode string
"""
if isinstance(response, bytes):
resp_content = response
elif self._is_gz_file(response.data):
resp_content = gzip.decompress(response.data)
else:
resp_content = response.data
guessed_encoding = self._detect_encoding(resp_content)
# process
htmltext = None
if guessed_encoding is not None:
try:
htmltext = resp_content.decode(guessed_encoding)
except UnicodeDecodeError:
pass
# force decoding
if htmltext is None:
htmltext = str(resp_content, encoding='utf-8', errors='replace')
return htmltext
def _is_gz_file(self, contents):
"""
Tell if a file's magic number corresponds to the GZip format
"""
# source: https://stackoverflow.com/questions/3703276/how-to-tell-if-a-file-is-gzip-compressed
return contents[:2] == b'\x1f\x8b'
def _detect_encoding(self, bytesobject):
"""
Read the first chunk of input and return its encoding
"""
# unicode-test
if self._isutf8(bytesobject):
return 'UTF-8'
# try one of the installed detectors on first part
guess = chardet.detect(bytesobject[:1999])
# fallback on full response
if guess is None or guess['confidence'] < 0.95:
guess = chardet.detect(bytesobject)
return guess['encoding']
def _isutf8(self, data):
"""
Simple heuristic to determine if a bytestring uses standard unicode encoding
"""
try:
data.decode('UTF-8')
except UnicodeDecodeError:
return False
else:
return True
# https://github.com/MaartenGr/KeyBERT
from keybert import KeyBERT
# https://github.com/flairNLP/flair
from flair.embeddings import TransformerDocumentEmbeddings
# https://github.com/pandas-dev/pandas
import pandas as pd
class keybert_kw_extractor:
def __init__(self, language='french'):
self.language = language
def calculate(self, corpus) -> pd.DataFrame:
#roberta = TransformerDocumentEmbeddings('abhilash1910/french-roberta')
roberta = TransformerDocumentEmbeddings('camembert-base')
model = KeyBERT(model=roberta)
text = ' '.join(corpus)
keywords = model.extract_keywords(text, keyphrase_ngram_range=(1, 1), stop_words=None)
return pd.DataFrame.from_records(
keywords, columns=['KW', 'keybert']
).set_index('KW')
import contentloader
import tfidf_KW_extractor
import yake_KW_extractor
import keybert_KW_extractor
def load_content(urls_pages):
html_reader = contentloader.CacheableReader()
for url_page in urls_pages:
html_reader.get_site_text(url_page)
def tf_idf_calculate():
'''
TF-IDF keywords extraction
'''
html_reader = contentloader.CacheableReader()
tf_idf_extractor = tfidf_KW_extractor.tfidf_kw_extractor()
corpus = [
html_reader.get_cached_site_content(filename)
for filename in html_reader.iter_list_txt_file()
]
df_tf_idf = tf_idf_extractor.calculate(corpus)
print(df_tf_idf.sort_values(by=['tfidf'], ascending=False).head(20))
def yake_calculate():
'''
YAKE keywords extraction
'''
html_reader = contentloader.CacheableReader()
yake_extractor = yake_KW_extractor.yake_kw_extractor()
corpus = [
html_reader.get_cached_site_content(filename)
for filename in html_reader.iter_list_txt_file()
]
df_yake = yake_extractor.calculate(corpus)
# Le mot-clé le mieux classé a le score le plus bas.
print(df_yake.sort_values(by=['yake'], ascending=True).head(20))
def keybert_calculate():
'''
keyBert keywords extraction
'''
html_reader = contentloader.CacheableReader()
keybert_extractor = keybert_KW_extractor.keybert_kw_extractor()
corpus = [
html_reader.get_cached_site_content(filename)
for filename in html_reader.iter_list_txt_file()
]
df_keybert = keybert_extractor.calculate(corpus)
print(df_keybert.sort_values(by=['keybert'], ascending=False).head(20))
if __name__ == '__main__':
test_links = [
'http://cocon.se/cocon-semantique/',
'http://cocon.se/cocon-semantique/cocon-seo/',
'http://cocon.se/cocon-semantique/images/',
]
load_content(test_links)
tf_idf_calculate()
yake_calculate()
keybert_calculate()
# https://github.com/scikit-learn/scikit-learn
from sklearn.feature_extraction.text import TfidfVectorizer
# https://github.com/nltk/nltk
# See Installing NLTK Data: http://www.nltk.org/data.html
import nltk
# https://github.com/pandas-dev/pandas
import pandas as pd
class tfidf_kw_extractor:
def __init__(self, language='french'):
self.language = language
# Stop Words in NLP: https://medium.com/@saitejaponugoti/stop-words-in-nlp-5b248dadad47
# nltk_data\corpora\stopwords
self.stop_words = nltk.corpus.stopwords.words(self.language)
self.tfidf_vectorizer = TfidfVectorizer(
stop_words=self.stop_words,
)
def calculate(self, corpus) -> pd.DataFrame:
tfidf_vectorizer_vectors = self.tfidf_vectorizer.fit_transform(corpus)
first_vector_tfidfvectorizer = tfidf_vectorizer_vectors[1]
df = pd.DataFrame(first_vector_tfidfvectorizer.T.todense(), index=self.tfidf_vectorizer.get_feature_names(), columns=['tfidf'])
df.index.names = ['KW']
return df
# https://github.com/LIAAD/yake
import yake
# https://github.com/pandas-dev/pandas
import pandas as pd
class yake_kw_extractor:
def __init__(self, language='fr'):
self.language = language
self.max_ngram_size = 1
self.num_of_keywords = 20
self.deduplication_thresold = 0.8
def calculate(self, corpus) -> pd.DataFrame:
custom_kw_extractor = yake.KeywordExtractor(
lan=self.language,
n=self.max_ngram_size,
top=self.num_of_keywords,
dedupLim=self.deduplication_thresold
)
text = ' '.join(corpus)
keywords = custom_kw_extractor.extract_keywords(text)
return pd.DataFrame.from_records(
keywords, columns=['KW', 'yake']
).set_index('KW')
@LeMoussel

This comment has been minimized.

Copy link
Owner Author

@LeMoussel LeMoussel commented Feb 15, 2021

Comparison of keyword extraction results with TF/IDF, Yake & KeyBert of the following 3 HTML pages:

Rem: I use camembert-base (CamemBERT is a state-of-the-art language model for French based on the RoBERTa model) with KeyBert.

TF/IDF result:

KW tfidf
cocon 0.590238
sémantique 0.344305
seo 0.245932
surfeur 0.190010
mieux 0.126673
quand 0.126673
bon 0.126673
raisonnable 0.126673
caractéristiques 0.083280
chat 0.083280

Yake result:

KW yake
cocon 0.003085
sémantique 0.004769
seo 0.018149
site 0.025734
image 0.028868
cocon.se 0.033222
visualisations 0.047597
concept 0.061866
sites 0.072055
bourrelly 0.074559

KeyBert result:

KW keybert
dupliquées 0.7600
caractéristiques 0.7454
url 0.7453
optimisation 0.7399
python 0.7386

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment