Created
February 15, 2021 14:15
-
-
Save LeMoussel/aa5f7c46c6cb09473e97b20eb2e13cc4 to your computer and use it in GitHub Desktop.
Compare keyword extraction results, in French language, from TF/IDF, Yake, KeyBert.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import codecs | |
import urllib3 | |
import gzip | |
from pathlib import Path | |
# CChardet is faster and can be more accurate | |
try: | |
import cchardet as chardet | |
except ImportError: | |
import chardet | |
# https://www.crummy.com/software/BeautifulSoup/ | |
from bs4 import BeautifulSoup | |
""" | |
Caching content reader that cleanses HTML from web site. | |
Cache occurs at both the raw HTML level and the cleansed text level, | |
allowing us to reduce the overhead of calling the internet and/or | |
re-cleansing content on every run through unless one of those methods | |
has changed | |
""" | |
class CacheableReader(object): | |
""" | |
A reader that downloads and cleanses content from the web, | |
with local caching based on the final segment of the URL | |
""" | |
def __init__(self): | |
# local folder cached text | |
self.cache_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'content_cache') | |
Path(self.cache_folder).mkdir(parents=True, exist_ok=True) | |
def get_site_text(self, url, force=False) -> str: | |
""" | |
Provide cleansed text for the provided URL utilizing a read-through local cache_site_text | |
@param url URL for the remote resource | |
@param force Force fresh download, bypassing the local cache | |
@return string Cleansed text content for the provided URL | |
""" | |
load_html_fresh = force is True or self._is_site_text_cached(url, 'html') is False | |
load_text_fresh = force is True or load_html_fresh or self._is_site_text_cached(url, 'txt') is False | |
if load_html_fresh: | |
html = self._download_site_html(url) | |
self._set_cache_site_content(url, html, 'html') | |
else: | |
html = self.get_cached_site_content(url, 'html') | |
if load_text_fresh: | |
text = self._cleaned_method(html) | |
self._set_cache_site_content(url, text, 'txt') | |
else: | |
text = self.get_cached_site_content(url, 'txt') | |
return text | |
def get_cached_site_content(self, url, content_type='txt') -> str: | |
""" | |
Returns the cached site text for the given URL from the local cache_folder | |
@param url remote URL that we want to load cache dtext for | |
@param content_type Type of content to load from cache (html or txt) | |
@return string cached site text | |
""" | |
cache_filename = self._create_cache_filename(url, content_type) | |
with codecs.open(cache_filename, 'r', 'utf-8') as cache_file: | |
return cache_file.read() | |
def iter_list_txt_file(self): | |
""" | |
Find all txt files in cache folder | |
""" | |
return (f for f in os.listdir(self.cache_folder) if f.endswith(".txt")) | |
def _create_cache_filename(self, url, content_type) -> bool: | |
""" | |
Generates a cache filename/path for a given cache folder and URL | |
@param url URL for the content that will be cached | |
@param content_type type of content (html or txt) | |
""" | |
cache_name = Path(url).stem | |
return os.path.join(self.cache_folder, "%s.%s" % (cache_name, content_type)) | |
def _is_site_text_cached(self, url, content_type) -> bool: | |
""" | |
Verifies if provided URL has already been cached in the given cache_folder | |
@param url Url of content to check in cache for | |
@param content_type Type of cached content to look for (html, txt) | |
@return bool Whether the content is cached or not | |
""" | |
cache_filename = self._create_cache_filename(url, content_type) | |
return os.path.isfile(cache_filename) and os.path.getsize(cache_filename) > 0 | |
def _download_site_html(self, url) -> str: | |
""" | |
Downloads and cleanses the content of the specified url | |
@param url URL to download content from | |
@return string html content of the site | |
""" | |
downloaded = self._fetch_url(url) | |
if downloaded is not None: | |
return downloaded | |
else: | |
return '' | |
def _set_cache_site_content(self, url, text, content_type): | |
""" | |
Cache the provided site text from the given URL to the provided cache_folder | |
@param url URL the provided text was downloaded from | |
@param text Cleansed text to cache for the given URL | |
@param content_type Type of content to cache (html or txt) | |
""" | |
cache_filename = self._create_cache_filename(url, content_type) | |
with codecs.open(cache_filename, 'w', 'utf-8') as cache_file: | |
cache_file.write(text) | |
def _cleaned_method(self, html) -> str: | |
text = ' '.join(BeautifulSoup(html, "html.parser").stripped_strings) | |
#text = ' '.join(BeautifulSoup(html, "html.parser").body.get_text(separator=' ').split()) | |
return text # .replace("\n", " ").replace("\'", "") | |
# https://github.com/adbar/trafilatura | |
# import trafilatura | |
# text = trafilatura.extract(html) | |
# https://github.com/weblyzard/inscriptis | |
# import inscriptis | |
# text = ' '.join(inscriptis.get_text(html).split()) | |
def _fetch_url(self, url, decode=True): | |
""" | |
Fetches page using request and decodes the response. | |
Args: | |
url: URL of the page to fetch. | |
decode: Decode response instead of returning Urllib3 response object (boolean). | |
Returns: | |
HTML code as string, or Urllib3 response object (headers + body), or empty string in case | |
the result is invalid, or None if there was a problem with the network. | |
""" | |
timeout = 15 | |
retry_strategy = urllib3.util.Retry( | |
total=3, | |
redirect=2, | |
connect=0, | |
backoff_factor=timeout * 2, | |
status_forcelist=[429, 500, 502, 503, 504], | |
) | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
http_pool = urllib3.PoolManager(retries=retry_strategy) | |
headers = { | |
'User-Agent': 'Keyword Extraction 1.0', | |
} | |
try: | |
response = http_pool.request('GET', url, headers=headers, timeout=timeout) | |
except urllib3.exceptions.NewConnectionError as err: | |
return None | |
except urllib3.exceptions.MaxRetryError as err: | |
return None | |
except urllib3.exceptions.TimeoutError as err: | |
return None | |
except Exception as err: | |
return None | |
else: | |
# safety checks | |
if response.status != 200 or response.data is None: | |
return None | |
if decode is True: | |
return self._decode_response(response.data) | |
return response | |
def _decode_response(self, response): | |
""" | |
Read the urllib3 object corresponding to the server response, | |
check if it could be GZip and eventually decompress it, then | |
try to guess its encoding and decode it to return a unicode string | |
""" | |
if isinstance(response, bytes): | |
resp_content = response | |
elif self._is_gz_file(response.data): | |
resp_content = gzip.decompress(response.data) | |
else: | |
resp_content = response.data | |
guessed_encoding = self._detect_encoding(resp_content) | |
# process | |
htmltext = None | |
if guessed_encoding is not None: | |
try: | |
htmltext = resp_content.decode(guessed_encoding) | |
except UnicodeDecodeError: | |
pass | |
# force decoding | |
if htmltext is None: | |
htmltext = str(resp_content, encoding='utf-8', errors='replace') | |
return htmltext | |
def _is_gz_file(self, contents): | |
""" | |
Tell if a file's magic number corresponds to the GZip format | |
""" | |
# source: https://stackoverflow.com/questions/3703276/how-to-tell-if-a-file-is-gzip-compressed | |
return contents[:2] == b'\x1f\x8b' | |
def _detect_encoding(self, bytesobject): | |
""" | |
Read the first chunk of input and return its encoding | |
""" | |
# unicode-test | |
if self._isutf8(bytesobject): | |
return 'UTF-8' | |
# try one of the installed detectors on first part | |
guess = chardet.detect(bytesobject[:1999]) | |
# fallback on full response | |
if guess is None or guess['confidence'] < 0.95: | |
guess = chardet.detect(bytesobject) | |
return guess['encoding'] | |
def _isutf8(self, data): | |
""" | |
Simple heuristic to determine if a bytestring uses standard unicode encoding | |
""" | |
try: | |
data.decode('UTF-8') | |
except UnicodeDecodeError: | |
return False | |
else: | |
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/MaartenGr/KeyBERT | |
from keybert import KeyBERT | |
# https://github.com/flairNLP/flair | |
from flair.embeddings import TransformerDocumentEmbeddings | |
# https://github.com/pandas-dev/pandas | |
import pandas as pd | |
class keybert_kw_extractor: | |
def __init__(self, language='french'): | |
self.language = language | |
def calculate(self, corpus) -> pd.DataFrame: | |
#roberta = TransformerDocumentEmbeddings('abhilash1910/french-roberta') | |
roberta = TransformerDocumentEmbeddings('camembert-base') | |
model = KeyBERT(model=roberta) | |
text = ' '.join(corpus) | |
keywords = model.extract_keywords(text, keyphrase_ngram_range=(1, 1), stop_words=None) | |
return pd.DataFrame.from_records( | |
keywords, columns=['KW', 'keybert'] | |
).set_index('KW') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contentloader | |
import tfidf_KW_extractor | |
import yake_KW_extractor | |
import keybert_KW_extractor | |
def load_content(urls_pages): | |
html_reader = contentloader.CacheableReader() | |
for url_page in urls_pages: | |
html_reader.get_site_text(url_page) | |
def tf_idf_calculate(): | |
''' | |
TF-IDF keywords extraction | |
''' | |
html_reader = contentloader.CacheableReader() | |
tf_idf_extractor = tfidf_KW_extractor.tfidf_kw_extractor() | |
corpus = [ | |
html_reader.get_cached_site_content(filename) | |
for filename in html_reader.iter_list_txt_file() | |
] | |
df_tf_idf = tf_idf_extractor.calculate(corpus) | |
print(df_tf_idf.sort_values(by=['tfidf'], ascending=False).head(20)) | |
def yake_calculate(): | |
''' | |
YAKE keywords extraction | |
''' | |
html_reader = contentloader.CacheableReader() | |
yake_extractor = yake_KW_extractor.yake_kw_extractor() | |
corpus = [ | |
html_reader.get_cached_site_content(filename) | |
for filename in html_reader.iter_list_txt_file() | |
] | |
df_yake = yake_extractor.calculate(corpus) | |
# Le mot-clé le mieux classé a le score le plus bas. | |
print(df_yake.sort_values(by=['yake'], ascending=True).head(20)) | |
def keybert_calculate(): | |
''' | |
keyBert keywords extraction | |
''' | |
html_reader = contentloader.CacheableReader() | |
keybert_extractor = keybert_KW_extractor.keybert_kw_extractor() | |
corpus = [ | |
html_reader.get_cached_site_content(filename) | |
for filename in html_reader.iter_list_txt_file() | |
] | |
df_keybert = keybert_extractor.calculate(corpus) | |
print(df_keybert.sort_values(by=['keybert'], ascending=False).head(20)) | |
if __name__ == '__main__': | |
test_links = [ | |
'http://cocon.se/cocon-semantique/', | |
'http://cocon.se/cocon-semantique/cocon-seo/', | |
'http://cocon.se/cocon-semantique/images/', | |
] | |
load_content(test_links) | |
tf_idf_calculate() | |
yake_calculate() | |
keybert_calculate() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/scikit-learn/scikit-learn | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
# https://github.com/nltk/nltk | |
# See Installing NLTK Data: http://www.nltk.org/data.html | |
import nltk | |
# https://github.com/pandas-dev/pandas | |
import pandas as pd | |
class tfidf_kw_extractor: | |
def __init__(self, language='french'): | |
self.language = language | |
# Stop Words in NLP: https://medium.com/@saitejaponugoti/stop-words-in-nlp-5b248dadad47 | |
# nltk_data\corpora\stopwords | |
self.stop_words = nltk.corpus.stopwords.words(self.language) | |
self.tfidf_vectorizer = TfidfVectorizer( | |
stop_words=self.stop_words, | |
) | |
def calculate(self, corpus) -> pd.DataFrame: | |
tfidf_vectorizer_vectors = self.tfidf_vectorizer.fit_transform(corpus) | |
first_vector_tfidfvectorizer = tfidf_vectorizer_vectors[1] | |
df = pd.DataFrame(first_vector_tfidfvectorizer.T.todense(), index=self.tfidf_vectorizer.get_feature_names(), columns=['tfidf']) | |
df.index.names = ['KW'] | |
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/LIAAD/yake | |
import yake | |
# https://github.com/pandas-dev/pandas | |
import pandas as pd | |
class yake_kw_extractor: | |
def __init__(self, language='fr'): | |
self.language = language | |
self.max_ngram_size = 1 | |
self.num_of_keywords = 20 | |
self.deduplication_thresold = 0.8 | |
def calculate(self, corpus) -> pd.DataFrame: | |
custom_kw_extractor = yake.KeywordExtractor( | |
lan=self.language, | |
n=self.max_ngram_size, | |
top=self.num_of_keywords, | |
dedupLim=self.deduplication_thresold | |
) | |
text = ' '.join(corpus) | |
keywords = custom_kw_extractor.extract_keywords(text) | |
return pd.DataFrame.from_records( | |
keywords, columns=['KW', 'yake'] | |
).set_index('KW') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Comparison of keyword extraction results with TF/IDF, Yake & KeyBert of the following 3 HTML pages:
Rem: I use camembert-base (CamemBERT is a state-of-the-art language model for French based on the RoBERTa model) with KeyBert.
TF/IDF result:
Yake result:
KeyBert result: