Skip to content

Instantly share code, notes, and snippets.

@BrambleXu
Last active September 9, 2022 07:48
Show Gist options
  • Star 18 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save BrambleXu/3d47bbdbd1ee4e6fc695b0ddb88cbf99 to your computer and use it in GitHub Desktop.
Save BrambleXu/3d47bbdbd1ee4e6fc695b0ddb88cbf99 to your computer and use it in GitHub Desktop.
from collections import OrderedDict
import numpy as np
import spacy
from spacy.lang.en.stop_words import STOP_WORDS
nlp = spacy.load('en_core_web_sm')
class TextRank4Keyword():
"""Extract keywords from text"""
def __init__(self):
self.d = 0.85 # damping coefficient, usually is .85
self.min_diff = 1e-5 # convergence threshold
self.steps = 10 # iteration steps
self.node_weight = None # save keywords and its weight
def set_stopwords(self, stopwords):
"""Set stop words"""
for word in STOP_WORDS.union(set(stopwords)):
lexeme = nlp.vocab[word]
lexeme.is_stop = True
def sentence_segment(self, doc, candidate_pos, lower):
"""Store those words only in cadidate_pos"""
sentences = []
for sent in doc.sents:
selected_words = []
for token in sent:
# Store words only with cadidate POS tag
if token.pos_ in candidate_pos and token.is_stop is False:
if lower is True:
selected_words.append(token.text.lower())
else:
selected_words.append(token.text)
sentences.append(selected_words)
return sentences
def get_vocab(self, sentences):
"""Get all tokens"""
vocab = OrderedDict()
i = 0
for sentence in sentences:
for word in sentence:
if word not in vocab:
vocab[word] = i
i += 1
return vocab
def get_token_pairs(self, window_size, sentences):
"""Build token_pairs from windows in sentences"""
token_pairs = list()
for sentence in sentences:
for i, word in enumerate(sentence):
for j in range(i+1, i+window_size):
if j >= len(sentence):
break
pair = (word, sentence[j])
if pair not in token_pairs:
token_pairs.append(pair)
return token_pairs
def symmetrize(self, a):
return a + a.T - np.diag(a.diagonal())
def get_matrix(self, vocab, token_pairs):
"""Get normalized matrix"""
# Build matrix
vocab_size = len(vocab)
g = np.zeros((vocab_size, vocab_size), dtype='float')
for word1, word2 in token_pairs:
i, j = vocab[word1], vocab[word2]
g[i][j] = 1
# Get Symmeric matrix
g = self.symmetrize(g)
# Normalize matrix by column
norm = np.sum(g, axis=0)
g_norm = np.divide(g, norm, where=norm!=0) # this is ignore the 0 element in norm
return g_norm
def get_keywords(self, number=10):
"""Print top number keywords"""
node_weight = OrderedDict(sorted(self.node_weight.items(), key=lambda t: t[1], reverse=True))
for i, (key, value) in enumerate(node_weight.items()):
print(key + ' - ' + str(value))
if i > number:
break
def analyze(self, text,
candidate_pos=['NOUN', 'PROPN'],
window_size=4, lower=False, stopwords=list()):
"""Main function to analyze text"""
# Set stop words
self.set_stopwords(stopwords)
# Pare text by spaCy
doc = nlp(text)
# Filter sentences
sentences = self.sentence_segment(doc, candidate_pos, lower) # list of list of words
# Build vocabulary
vocab = self.get_vocab(sentences)
# Get token_pairs from windows
token_pairs = self.get_token_pairs(window_size, sentences)
# Get normalized matrix
g = self.get_matrix(vocab, token_pairs)
# Initionlization for weight(pagerank value)
pr = np.array([1] * len(vocab))
# Iteration
previous_pr = 0
for epoch in range(self.steps):
pr = (1-self.d) + self.d * np.dot(g, pr)
if abs(previous_pr - sum(pr)) < self.min_diff:
break
else:
previous_pr = sum(pr)
# Get weight for each node
node_weight = dict()
for word, index in vocab.items():
node_weight[word] = pr[index]
self.node_weight = node_weight
@BrambleXu
Copy link
Author

# Usage
text = '''
The Wandering Earth, described as China’s first big-budget science fiction thriller, quietly made it onto screens at AMC theaters in North America this weekend, and it shows a new side of Chinese filmmaking — one focused toward futuristic spectacles rather than China’s traditionally grand, massive historical epics. At the same time, The Wandering Earth feels like a throwback to a few familiar eras of American filmmaking. While the film’s cast, setting, and tone are all Chinese, longtime science fiction fans are going to see a lot on the screen that reminds them of other movies, for better or worse.
'''

tr4w = TextRank4Keyword()
tr4w.analyze(text, candidate_pos = ['NOUN', 'PROPN'], window_size=4, lower=False)
tr4w.get_keywords(10)

# Output
# science - 1.717603106506989
# fiction - 1.6952610926181002
# filmmaking - 1.4388798751402918
# China - 1.4259793786986021
# Earth - 1.3088154732297723
# tone - 1.1145002295684114
# Chinese - 1.0996896235078055
# Wandering - 1.0071059904601571
# weekend - 1.002449354657688
# America - 0.9976329264870932
# budget - 0.9857269586649321
# North - 0.9711240881032547

@fabriziomiano
Copy link

pretty neat

@derdanielb
Copy link

How is this code licensed?

@BrambleXu
Copy link
Author

@derdanielb
It is MIT License.

@derdanielb
Copy link

derdanielb commented Sep 30, 2019

@BrambleXu

@derdanielb
It is MIT License.

Thanks for the quick reply!

@igormis
Copy link

igormis commented Sep 10, 2020

Hi, is it possible to extract bigrams or trigrams as keywords?

@MSaifAsif
Copy link

very cool ! Thanks

@paulthemagno
Copy link

how to normalize text rank values between 0 and 1 ?

@smanceau
Copy link

smanceau commented May 17, 2022

Hi @BrambleXu and thx for your code
I think the condition line 124 stops iterations too early

         if abs(previous_pr - sum(pr))  < self.min_diff:
             break

Could be fixed this way:

        # Iteration
        #previous_pr = 0
        previous_pr = pr

        for epoch in range(self.steps):
            pr = (1-self.d) + self.d * np.dot(g, pr)

            #if abs(previous_pr - sum(pr))  < self.min_diff:
            if sum(abs(previous_pr - pr)) < self.min_diff:
                break
            else:
                #previous_pr = sum(pr)
                previous_pr = pr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment