Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# define a function to compute sentiments of the received tweets
def get_prediction(tweet_text):
try:
# filter the tweets whose length is greater than 0
tweet_text = tweet_text.filter(lambda x: len(x) > 0)
# create a dataframe with column name 'tweet' and each row will contain the tweet
rowRdd = tweet_text.map(lambda w: Row(tweet=w))
# create a spark dataframe
wordsDataFrame = spark.createDataFrame(rowRdd)
# transform the data using the pipeline and get the predicted sentiment
pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
except :
print('No data')
# initialize the streaming context
ssc = StreamingContext(sc, batchDuration= 3)
# Create a DStream that will connect to hostname:port, like localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
# split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
words = lines.flatMap(lambda line : line.split('TWEET_APP'))
# get the predicted sentiments for the tweets received
words.foreachRDD(get_prediction)
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment