Skip to content

Instantly share code, notes, and snippets.

@adriantre
Created March 30, 2017 11:47
Show Gist options
  • Save adriantre/7ac2608f86aea9dbc4fa2a1c5a85a5d1 to your computer and use it in GitHub Desktop.
Save adriantre/7ac2608f86aea9dbc4fa2a1c5a85a5d1 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import os
import re
import sys
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark import SparkContext
def parseRowDescription(desc, distinct):
url_pattern = re.compile('http[s]?://(?:[a-zA-Z]|[0-9]|'
'[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
desc_list = filter(None,
re.sub(r'\b\w{1}\b', '',
re.sub('[^a-zA-Z\s]', ' ',
url_pattern.sub('', desc)
)
).lower().split(" ")
)
if distinct:
desc_list = list(set(desc_list))
return desc_list
def parseAllDescriptions(rdd, distinct):
return rdd.map(
lambda (key, value): (key, parseRowDescription(value,distinct))
)
def getNumOccurancesOfTerms(rdd):
return rdd.flatMap(lambda (k,v): v)\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
def getNumTermsInDox(rdd):
return rdd.flatMap(lambda (k, v): v)\
.distinct().count()
def calcTF_IDF(term_occ, num_terms_in_doc, num_documents):
return term_occ.map(lambda (k,v):
(float(v[0])/num_terms_in_doc
* num_documents/v[1], k)\
).sortByKey(False)\
.map(lambda (v,k): (k,v))
if __name__ == "__main__":
sc = SparkContext("local", "TF-IDF")
sc.setLogLevel("ERROR")
spark = SparkSession.builder\
.master("local")\
.appName("Word Count")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
input_file = sys.argv[1]
doc_type = sys.argv[2]
row_key = sys.argv[3]
input_file_name = os.path.basename(input_file)
input_file_folder = os.path.dirname(os.path.abspath(input_file))
file_df = spark.read.csv(input_file, sep='\t', header=True)
if doc_type == 'n':
id_key = 'neighbourhood'
elif doc_type == 'l':
id_key = 'id'
else:
sys.exit('Input parameters unrecognized!')
all_documents = file_df.na.drop(subset=['description'])\
.select(id_key, 'description')
main_row_rdd = all_documents\
.filter(col(id_key) == row_key)\
.rdd.map(tuple)
if doc_type == 'n':
main_row_rdd = main_row_rdd\
.reduceByKey(lambda a,b: a + b)
main_row_parsed = parseAllDescriptions(main_row_rdd, False)
all_documents_rdd = all_documents.rdd
all_documents_parsed = parseAllDescriptions(all_documents_rdd, True)
num_occ = getNumOccurancesOfTerms(main_row_parsed)
num_terms_in_doc = getNumTermsInDox(main_row_parsed)
num_documents = all_documents_parsed.count()
num_doc_with_term = all_documents_parsed.flatMap(lambda (k,v): v)\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
term_occ = num_occ.join(num_doc_with_term)
terms_tf_idf = calcTF_IDF(term_occ, num_terms_in_doc, num_documents)
output_path = '{}/{}_{}_tf_idf.tsv'.format(input_file_folder,
input_file_name, row_key)
terms_tf_idf.map(lambda (k,v): '{}\t{}'.format(k,v))\
.coalesce(1).saveAsTextFile(output_path)
sc.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment