Text classification using Twitter, MeCab, TokyoCabinet and nltk.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.6 | |
# coding: utf-8 | |
import twitter | |
CONSUMER_KEY = '' | |
CONSUMER_SECRET = '' | |
ACCESS_TOKEN_KEY = '' | |
ACCESS_TOKEN_SECRET = '' | |
def get_twitter_api(): | |
api = twitter.Api(consumer_key=CONSUMER_KEY, | |
consumer_secret=CONSUMER_SECRET, | |
access_token_key=ACCESS_TOKEN_KEY, | |
access_token_secret=ACCESS_TOKEN_SECRET) | |
#api.VerifyCredentials() | |
return api | |
def get_timeline(id, count=100, page=1): | |
# get user status | |
api = get_twitter_api() | |
statuses = api.GetUserTimeline(id=id, count=count, page=page) | |
if not statuses: | |
raise twitter.TwitterError('No Tweets') | |
return { | |
'name': statuses[0].user.name, | |
'tweets': [status.text for status in statuses], | |
} | |
# Cached Twitter (using TokyoCabinet) | |
class Twitter(object): | |
def __init__(self, datafile='tweets.tch'): | |
import tc | |
self.db = tc.HDB(datafile, tc.HDBOWRITER | tc.HDBOCREAT) | |
def close(self): | |
self.db.close() | |
def get_timeline(self, id, count=100, update=False): | |
import json | |
if (id not in self.db) or update: | |
print 'Fetching %s\'s timeline..' % (id), | |
try: | |
timeline = get_timeline(id) | |
print 'name = %s.' % (timeline['name']) | |
self.db[id] = json.dumps(timeline) | |
except twitter.TwitterError, e: | |
print e | |
self.db[id] = json.dumps({}) | |
return json.loads(self.db[id]) | |
def get_tweets(self, id, update=False): | |
return self.get_timeline(id=id, update=update).get('tweets', []) | |
def get_name(self, id, update=False): | |
return self.get_timeline(id=id, update=update).get('name', '') | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
self.close() | |
# Cleanup | |
import re | |
re_mention = re.compile(r'@\w+') | |
re_url = re.compile(r'http://[\w./?=&#+\-]+') | |
def cleanup_tweet(tweet): | |
tweet = re_mention.sub('', tweet) | |
tweet = re_url.sub('', tweet) | |
return tweet | |
# MeCab | |
class MeCabTagger(object): | |
def __init__(self): | |
import MeCab | |
self._tagger = MeCab.Tagger() | |
def get_word(self, surface, feature): | |
fs = feature.split(",") | |
if fs[0] == '名詞' and fs[1] not in ('数', '接尾'): | |
return fs[6] != '*' and fs[6] or surface | |
def text2words(self, text, kinds=None): | |
try: | |
node = self._tagger.parseToNode(text.encode('utf-8')) | |
except RuntimeError, e: | |
raise e | |
words = [] | |
while node: | |
word = self.get_word(node.surface, node.feature) | |
if word: | |
words.append(word) | |
node = node.next | |
return words | |
tagger = MeCabTagger() | |
# Others | |
def get_counts(keys): # [A, A, B, B, A, ..] -> {A: 3, B: 2, ..} | |
counts = {} | |
for key in keys: | |
if key not in counts: | |
counts[key] = 0 | |
counts[key] += 1 | |
return counts | |
def tweet2features(tweet, tagger=tagger): # "tweet text" -> "{tweet: 1, text: 1}" | |
tweet = cleanup_tweet(tweet) | |
return get_counts(tagger.text2words(tweet)) | |
def get_features_labels(id_labels, update=False): # [(twitter id, A), ..] -> [({word1: 3, word2:2}, A), ..] | |
with Twitter() as twitter: | |
arr = [] | |
for id, label in id_labels: | |
for tweet in twitter.get_tweets(id, update=update): | |
features = tweet2features(tweet) | |
arr.append((features, label)) | |
return arr | |
def read_id_labels(sourcefile): # file -> [(twitter_id, label), ..] | |
with open(sourcefile) as source: | |
return [line[:-1].split(' ', 1) for line in source] | |
def get_trained_classifier(trainfile, update=False): | |
train_id_labels = read_id_labels(trainfile) | |
print trainfile, 'から学習中..', len(train_id_labels), '件' | |
train_set = get_features_labels(train_id_labels, update=update) | |
import nltk | |
return nltk.NaiveBayesClassifier.train(train_set) | |
# Applications | |
def app_show(id, update=False, cleanup=False, **kwds): | |
with Twitter() as twitter: | |
for tweet in twitter.get_tweets(id, update=update): | |
if cleanup: | |
tweet = cleanup_tweet(tweet) | |
print tweet | |
def app_words(id, update=False, **kwds): | |
tagger = MeCabTagger() | |
with Twitter() as twitter: | |
for tweet in twitter.get_tweets(id, update=update): | |
tweet = cleanup_tweet(tweet) | |
for word in tagger.text2words(tweet): | |
print word, | |
def app_classify(trainfile, id, update=False, **kwds): | |
classifier = get_trained_classifier(trainfile, update=update) | |
with Twitter() as twitter: | |
tweets = twitter.get_tweets(id, update=update) | |
name = twitter.get_name(id).encode('utf-8') | |
features_list = [tweet2features(tweet) for tweet in tweets] | |
if not features_list: | |
print '判定不能でした。' | |
else: | |
labels = [classifier.classify(features) for features in features_list] | |
results = get_counts(labels) | |
print '%s (%s) は%sっぽいです。' % (id, name, max(results.items(), key=lambda x: x[1])[0]) | |
def app_test(trainfile, testfile, update=False, **kwds): | |
classifier = get_trained_classifier(trainfile, update=update) | |
test_id_labels = read_id_labels(testfile) | |
test_set = get_features_labels(test_id_labels, update=update) | |
import nltk | |
print '精度は', nltk.classify.accuracy(classifier, test_set) | |
#classifier.show_most_informative_features() | |
USAGE="""\ | |
%prog show id [count=100] [--update] [--cleanup] # show tweets for id | |
%prog words id [--update] # show words for id | |
%prog classify trainfile id [--update] # classify id by trainfile | |
%prog test trainfile testfile [--update] # test labels | |
""" | |
if __name__ == '__main__': | |
import optparse | |
op = optparse.OptionParser(usage=USAGE) | |
op.add_option('--tagged', action='store_true', default=False, dest='tagged') | |
op.add_option('--update', action='store_true', default=False, dest='update') | |
op.add_option('--cleanup', action='store_true', default=False, dest='cleanup') | |
opts, args = op.parse_args() | |
if not args: | |
op.print_usage() | |
else: | |
app = args[0] | |
globals()['app_' + app](*args[1:], **opts.__dict__) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment