Last active
September 18, 2015 00:14
-
-
Save ceteri/5c0bec69b0bce9480886 to your computer and use it in GitHub Desktop.
PySpark impl of TextRank, see http://www.cse.unt.edu/~rada/papers/mihalcea.emnlp04.pdf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import nltk | |
nltk.download() | |
## use nltk.download() within a Python prompt to | |
## download the `punkt` data | |
## Anaconda is recommended, to pick up NumPy, NLTK, etc. | |
## http://continuum.io/downloads | |
## this also requires TextBlob/PerceptronTagger | |
## pip install -U textblob textblob-aptagger |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## PySpark impl for TextRank by Mihalcea, et al. | |
## http://www.cse.unt.edu/~rada/papers/mihalcea.emnlp04.pdf | |
import re | |
from operator import add | |
from itertools import tee, izip | |
import numpy as np | |
import nltk.data | |
from nltk import stem | |
from text.blob import TextBlob as tb | |
from textblob_aptagger import PerceptronTagger | |
TOKENIZER = nltk.data.load('tokenizers/punkt/english.pickle') | |
TAGGER = PerceptronTagger() | |
STEMMER = stem.porter.PorterStemmer() | |
def pos_tag (s): | |
"""high-performance part-of-speech tagger""" | |
global TAGGER | |
return TAGGER.tag(s) | |
def wrap_words (pair): | |
"""wrap each (word, tag) pair as an object with fully indexed metadata""" | |
global STEMMER | |
index = pair[0] | |
result = [] | |
for word, tag in pair[1]: | |
word = word.lower() | |
stem = STEMMER.stem(word) | |
keep = tag in ('JJ', 'NN', 'NNS', 'NNP',) | |
result.append({ "index": index, "stem": stem, "word": word, "tag": tag, "keep": keep }) | |
index += 1 | |
return result | |
def sliding_window (iterable, size): | |
"""apply a sliding window to produce 'size' tiles""" | |
iters = tee(iterable, size) | |
for i in xrange(1, size): | |
for each in iters[i:]: | |
next(each, None) | |
return list(izip(*iters)) | |
def keep_pair (pair): | |
"""filter the relevant linked word pairs""" | |
return pair[0]["keep"] and pair[1]["keep"] and (pair[0]["word"] != pair[1]["word"]) | |
def link_words (seq): | |
"""attempt to link words in a sentence""" | |
return [ (seq[0], word) for word in seq[1:] ] | |
def compute_contribs (links, rank): | |
"""calculate link contributions to the rank of other links""" | |
num_links = len(links) | |
for link in links: | |
yield (link, rank / num_links,) | |
def text_rank_graph (neighbors, iter=20): | |
"""run the TextRank algorithm on the `neighbors` graph of the document""" | |
links = neighbors.distinct().groupByKey().cache() | |
ranks = links.map(lambda (link, neighbors): (link, 1.0)) | |
for iteration in xrange(iter): | |
contribs = links.join(ranks).flatMap( | |
lambda (link, (links, rank)): compute_contribs(links, rank)) | |
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15) | |
for (rank, key) in ranks.map(lambda x: (x[1], x[0],)).sortByKey(False).collect(): | |
yield key, rank | |
def glean_rank (pair): | |
"reorder the word metdata into (K,V) pairs for sorting" | |
key = pair[0] | |
w = pair[1][0] | |
rank = pair[1][1] | |
return w["index"], (w["word"], rank,) | |
def extract_phrases (doc): | |
"""extract the top-ranked keyphrases from the given tagged doc""" | |
last_index = -1000 | |
last_rank = 0.0 | |
result = [] | |
for index, (word, rank) in doc: | |
if index - last_index > 1: | |
if len(result) > 0: | |
yield last_rank * len(result), ' '.join(result) | |
last_rank = 0.0 | |
result = [] | |
last_rank += rank | |
last_index = index | |
result.append(word) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TEXT = """ | |
Compatibility of systems of linear constraints over the set of natural numbers. | |
Criteria of compatibility of a system of linear Diophantine equations, strict | |
inequations, and nonstrict inequations are considered. Upper bounds for | |
components of a minimal set of solutions and algorithms of construction of | |
minimal generating sets of solutions for all types of systems are given. | |
These criteria and the corresponding algorithms for constructing a minimal | |
supporting set of solutions can be used in solving all the considered types | |
systems and systems of mixed types. | |
""" | |
# construct a fully tagged document that is segmented into sentences | |
sent = sc.parallelize(TOKENIZER.tokenize(TEXT)).map(pos_tag).cache() | |
base = list(np.cumsum(np.array(sent.map(len).collect()))) | |
base.insert(0, 0) | |
base.pop() | |
lens = sc.parallelize(base) | |
tagged_doc = lens.zip(sent).map(wrap_words).cache() | |
tagged_doc.collect() | |
# apply a sliding window to construct a graph of the relevant word pairs | |
tiled = tagged_doc.flatMap(lambda s: sliding_window(s, 3)).flatMap(link_words).filter(keep_pair) | |
t0 = tiled.map(lambda link: (link[0]["stem"], link[1]["stem"],)) | |
t1 = tiled.map(lambda link: (link[1]["stem"], link[0]["stem"],)) | |
neighbors = t0.union(t1) | |
# visualize in a graph | |
import matplotlib.pyplot as plt | |
import networkx as nx | |
G = nx.Graph() | |
for a, b in neighbors.collect(): | |
G.add_edge(a, b, weight=1.0) | |
edges = [ (u,v) for (u,v,d) in G.edges(data=True) ] | |
pos = nx.spring_layout(G) | |
nx.draw_networkx_nodes(G, pos, node_size=700) | |
nx.draw_networkx_edges(G, pos, edgelist=edges, width=6) | |
nx.draw_networkx_labels(G, pos, font_size=20, font_family='sans-serif') | |
plt.axis('off') | |
plt.savefig("weighted_graph.png") | |
plt.show() | |
# run TextRank, then extract the top-ranked keyphrases | |
rank = sc.parallelize(text_rank_graph(neighbors)) | |
tags = tagged_doc.flatMap(lambda x: x).map(lambda w: (w["stem"], w,)) | |
l = tags.join(rank).map(glean_rank).sortByKey().collect() | |
for rank, phrase in sorted(set(extract_phrases(l)), reverse=True): | |
print "%0.2f %s" % (rank, phrase,) | |
## also try monitoring http://localhost:4040/ while this is running, | |
## to see the dynamics of Spark in action | |
## NLTK has some bugs in terms of word segmentation, and its PoS taggers | |
## could be much better, but this gives a general idea of how TextRank works | |
## In practice, a combination of TF-IDF and TextRank make a good blend |
Looks like a markdown for cuneiform? :)
hi, can we do unsupervised sentiment analysis using nltk or textbob packages of python over spark that is pyspark . i mean suppose i have different rows of sentence then with entire pre processing like tokenization ,stop word removal ,pos tagging etc.. can we tag different sentence as postive,negative or neutral in pyspark.? standalone we can use python with textblob or ntlk package we can do this task. will it possible to do this task over spark with python.,? please reply if you have any idea
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
\
\\\``````