Skip to content

Instantly share code, notes, and snippets.

@sdaza
Last active October 28, 2017 02:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sdaza/3e6d875c91dea2058e80a6c04b47c42f to your computer and use it in GitHub Desktop.
Save sdaza/3e6d875c91dea2058e80a6c04b47c42f to your computer and use it in GitHub Desktop.
Spark
import re
from pyspark.sql import Row
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.types import DoubleType
from operator import add
from lxml import etree
# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)
# get
train = sc.textFile(localpath('spark-stats-data/post_training')).cache()
test = sc.textFile(localpath('spark-stats-data/post_test')).cache()
# functions
def check_xml_posts(xml_text):
if ('Tags' in xml_text) & ('Body' in xml_text):
try:
doc = etree.fromstring(xml_text)
return True
except:
return False
return False
def get_tags(line):
root = etree.XML(line)
t = root.attrib['Tags']
return re.sub(r'<|>', ' ', t).strip()
def body_tags(line):
root = etree.XML(line)
t = [root.attrib['Body'], root.attrib['Tags']]
return [re.sub(r'<p>|</p>', ' ', t[0]).lower().strip(), re.sub(r'<|>', ' ', t[1]).strip().split()]
# setting the data
pop_tags = train.filter(lambda x: check_xml_posts(x)) \
.flatMap(lambda x: get_tags(x).split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(add) \
.map(lambda (k,v): (v,k)).sortByKey(False).take(100)
pop_tags = [i for x, i in pop_tags]
# tokenizer and regression
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression()
# training
traintag = train.filter(lambda x: check_xml_posts(x)) \
.map(lambda x: body_tags(x))
# testing
testingdata = sqlContext.createDataFrame(test.filter(lambda x: check_xml_posts(x)) \
.map(lambda x: body_tags(x)) \
.map(lambda x: (x[0],)),schema=['text'])
testingdata = hashingTF.transform(tokenizer.transform(testingdata))
# predictions
predictions = []
for tag in pop_tags:
temp = sqlContext.createDataFrame(traintag.map(lambda x: Row(1,x[0]) if tag in x[1] else Row(0,x[0])),schema=['label','text'])
temp = hashingTF.transform(tokenizer.transform(temp))
model = lr.fit(temp[['label','features']].withColumn("label", temp.label.cast(DoubleType())),params={'maxIter':10, 'regParam':0.01})
pred = model.transform(testingdata)
predictions.append((tag,pred.rdd.map(lambda x: x.probability).map(lambda x: x[1]).collect()))
def classification():
return predictions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment