Skip to content

Instantly share code, notes, and snippets.

@narulkargunjan
Created April 28, 2014 10:38
Show Gist options
  • Save narulkargunjan/11368079 to your computer and use it in GitHub Desktop.
Save narulkargunjan/11368079 to your computer and use it in GitHub Desktop.
Kanjo
import os
import gzip
import json
import re
import string
import pprint
import esmre
from collections import defaultdict, deque
from senti_classifier import senti_classifier
import requests
import translitcodec
import json
import jsonrpclib
def printTweet(tweet):
pprint.pprint(tweet)
regiondict = json.loads(open('metadata/state_regions_divisions.json').read())
split_str = '::'
def tweetGeoTag(tweet):
states = sorted(list(regiondict.keys()))
geo = {'world' : 'WW'}
place = tweet.get('place', {})
place = place and place or {}
try:
geo['country'] = place['country_code']
except KeyError:
pass
try:
coords = tweet['coordinates']['coordinates']
except (KeyError, TypeError):
coords = None
if (coords and geo.get('country', None) == 'US'):
point = {'latitude': coords[1],
'longitude': coords[0] }
geo['state'] = decodeCoordinate(point)
elif (place and place['bounding_box']['coordinates'][0]
and geo.get('country') == 'US'):
bbox = place['bounding_box']['coordinates'][0]
avgcoord = map(lambda x: x*1.0/len(bbox),
reduce(lambda x, y: ((y[0] + x[0]),
(y[1] + x[1])),
bbox, (0, 0)))
avgcoord = {'latitude': avgcoord[1],
'longitude': avgcoord[0] }
geo['state'] = decodeCoordinate(avgcoord)
geo[place['place_type']] = place['name']
state = geo.get('state', '')
try:
rd = regiondict[state]
except KeyError:
pass
else:
geo['division'] = rd['Division']
geo['region'] = rd['Region']
return geo
statecoord = json.loads(open('metadata/state_coordinates.json').read())
def decodeCoordinate(target):
longitude = lambda p: p['longitude']
latitude = lambda p: p['latitude']
def dist(p1, p2):
xdist = (longitude(p1) - longitude(p2)) ** 2.0
ydist = (latitude(p1) - latitude(p2)) ** 2.0
return xdist + ydist
dists = map(lambda state: {'delta' : dist(state, target),
'code' : state['state']}, statecoord)
mdist = min(dists, key=lambda x: x['delta'])
return mdist['code']
def tweetProcessText(tweet):
# process the tweets
#Convert to lower case
tweet = tweet.lower()
#Remove the crazy smilies (UTF-16) but keep the normal ones
tweet = tweet.encode('translit/long').encode('ascii', 'ignore')
#substitute the slangs
tweet = substituteSlangs(tweet)
#Remove www.* or https?://*
tweet = re.sub('((www\.[\s]+)|(https?://[^\s]+))','',tweet)
#Remove @username
tweet = re.sub('(rt)? @[^\s]+','',tweet)
#Remove additional white spaces
tweet = re.sub('[\s]+', ' ', tweet)
#Replace #word with word
tweet = re.sub(r'#([^\s]+)', r'\1', tweet)
#Remove the numbers
tweet = re.sub('[0-9]*','',tweet)
#Remove Punctuations
tweet = re.sub(r'[^\w\s]','',tweet)
#trim
tweet = tweet.strip('\'"')
#trim
tweet = tweet.strip()
return tweet
#end
slanglist = [line.strip().split('\t') for line in open("metadata/SlangLookupTable.txt")]
SLANGS = esmre.Index()
for slang, replacement in slanglist:
SLANGS.enter('%s' % slang, (slang,replacement))
def substituteSlangs(tweet):
matchesl = []
_sl = list(enumerate(tweet.split(' ')))
for matchl in SLANGS.query(tweet):
for i, word in _sl:
if matchl[0] in [word]:
_sl[i] = (i, matchl[1])
return (' '.join([x[1] for x in _sl]))
emot = list(set([tuple(line.strip().split('\t')) for line in open("metadata/EmoticonLookupTable.txt")]))
def tweetEmoticonScore(tweet):
tweet = tweet['text']
tweet = tweet.encode('translit/long').encode('ascii', 'ignore')
pos_emot = 0
neg_emot = 0
useful = "F"
polarity = 0
emot_details = {}
emot_score = filter(None, map(lambda x: x[0] in tweet.split() and x or None, emot))
for score in emot_score:
if int(score[1]) > 0:
pos_emot += int(score[1])
else:
neg_emot += int(score[1])
if pos_emot != 0 or neg_emot != 0:
useful = "T"
if pos_emot + neg_emot > 0:
polarity = "P"
elif pos_emot + neg_emot < 0:
polarity = "N"
emot_details = {'method' : 'emoticon_score', 'matches' : emot_score,'pos_score' : pos_emot, 'neg_score' : neg_emot,
'useful' : useful, 'polarity' : polarity}
return emot_details
hashdet = [line.strip().split('\t') for line in open("metadata/NRC-Hastag-Sentiment-Lexicon-unigrams-pmilexicon.txt")]
hashdict = {}
for term, score, num_pos, num_neg in hashdet:
hashdict[term] = float(score)
def HASHTAGFit(tweet):
hashtags = tweet['entities']['hashtags']
pos_hash = 0
neg_hash = 0
useful = "F"
polarity = 0
matches = []
hashtags_details = {}
for _hashd in hashtags:
_hash = '#' + _hashd['text']
try:
match = hashdict[_hash]
except KeyError:
continue
else:
matches.append((_hash,match))
for score in matches:
if int(score[1]) > 0:
pos_hash += score[1]
else:
neg_hash += score[1]
if pos_hash != 0 or neg_hash != 0:
useful = "T"
if pos_hash + neg_hash > 0:
polarity = "P"
elif pos_hash + neg_hash < 0:
polarity = "N"
hashtags_details = {'method' : 'hashtag_score', 'matches' : matches,'pos_score' : pos_hash, 'neg_score' : neg_hash,
'useful' : useful, 'polarity' : polarity}
return hashtags_details
afinn = [line.strip().split('\t') for line in open("metadata/AFINN-111.txt")]
AFINN = esmre.Index()
for word, score in afinn:
AFINN.enter('%s' % word, (word,score))
def AFINNFit(tweet):
tweet = tweet['textProcessed']
pos_words = 0
neg_words = 0
useful = "F"
polarity = 0
matches = []
afinn_details = {}
_st = tweet.split(' ')
for match in AFINN.query(tweet):
if match[0] in _st:
matches.append(match)
for score in matches:
if int(score[1]) > 0:
pos_words += int(score[1])
else:
neg_words += int(score[1])
if pos_words != 0 or neg_words != 0:
useful = "T"
if pos_words + neg_words > 0:
polarity = "P"
elif pos_words + neg_words < 0:
polarity = "N"
afinn_details = {'method' : 'afinn_score', 'matches' : matches,'pos_score' : pos_words, 'neg_score' : neg_words,
'useful' : useful, 'polarity' : polarity}
return afinn_details
def SENTIWORDNETFit(tweet):
pos_score = 0
neg_score = 0
useful = "F"
polarity = 0
pos_score, neg_score = senti_classifier.polarity_scores(tweet)
if pos_score != 0 or neg_score != 0:
useful = "T"
if pos_score + neg_score > 0:
polarity = "P"
elif pos_score + neg_score < 0:
polarity = "N"
sentiwordnet_details = {'method' : 'senti_classifier_score', 'pos_score' : pos_score, 'neg_score' : neg_score,
'useful' : useful, 'polarity' : polarity}
return sentiwordnet_details
class StanfordNLP:
def __init__(self, port_number=8080):
self.server = jsonrpclib.Server("http://192.168.1.3:%d" % port_number)
def parse(self, text):
return self.server.parse(text)
nlp = StanfordNLP()
def STANFNLPFit(tweet):
useful = "F"
tweet = tweet['textProcessed']
result = nlp.parse(tweet)
stanfnlp_details = {'method' : 'stanfnlp_score', 'polarity' : result, 'useful' : useful}
return stanfnlp_details
# Sentiment
def tweetSentiFit(tweet):
fit_methods = [AFINNFit, SENTIWORDNETFit, tweetEmoticonScore, HASHTAGFit, STANFNLPFit]
fit = []
for method in fit_methods:
fit.append(method(tweet))
return fit
senti140 = deque()
senti140count = 0
def SENTI140Fit(tweet,callback):
global senti140count
if senti140count < 10:
senti140count += 1
senti140.append(tweet)
else:
senti140.append(tweet)
senti140data = list(senti140)
senti140.clear()
senti140count = 0
data = map(lambda x: {'text' : x['text'], 'id' : x['id'],
'query' : 'pepsi'}, senti140data)
senti140response = requests.post('http://www.sentiment140.com/api/bulkClassifyJson?appid=narulkargunjan@mgmt.iisc.ernet.in', data = json.dumps({'data' : data}))
senti140response_json = senti140response.json()
senti140resp_dict = {}
for tweets in senti140response_json['data']:
senti140resp_dict[tweets['id']]=tweets['polarity']
polarity = -1
useful = "F"
# print senti140data
for orig_tweet in senti140data:
try:
polarity = senti140resp_dict[orig_tweet['id']]
except:
continue
else:
orig_tweet['sentiment_vector'].append({'senti140_score' : polarity, 'useful' : 'T'})
# print orig_tweet['sentiment_vector']
callback(senti140data)
def write_tweets_train(tweets):
start = tweets[0]['id']
with gzip.open('/tmp/train_%s.gz' % start, 'wb') as f:
data = map(lambda x: json.dumps(x), tweets)
f.writelines('\n'.join(data))
files = os.listdir("data/testdata/")
def main():
for f in files:
fp = os.path.join("data/testdata/",f)
with gzip.open(fp, 'rb') as zf:
for line in zf.readlines():
tweet = json.loads(line)
# printTweet(tweet)
# tweet['geoTag'] = tweetGeoTag(tweet)
print tweet['text']
tweet['textProcessed'] = tweetProcessText(tweet['text'])
# print tweet['textProcessed']
# tweet['emoticonScore'] = tweetEmoticonScore(tweet['text'])
# tweet['hashtagScore'] = HASHTAGFit(tweettweet['entities']['hashtags'])
tweet['sentiment_vector'] = tweetSentiFit(tweet)
print tweet['sentiment_vector']
SENTI140Fit(tweet,write_tweets_train)
# raise SystemExit
if __name__ == '__main__':
main()
#print tweetSentiFit(tweetProcessText('Fuck pepsi'))
#print tweetEmoticonScore(('Fuck pepsi :@ Hail coke :) 8)'))
#print tweetProcessText('Fuck pepsi :@ Hail coke :) 8) rofl lmao g')
# print tweetSentiFit('fuck Pepsi :@ hail coke :) 8) rofl lmao g')
# print HASHTAGFit(['#lovedit','#foul'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment