Created
March 24, 2013 19:01
-
-
Save sl8r000/5233062 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib2 | |
import json | |
import time | |
class QueryBuilder(object): | |
BASE_URL = 'https://api.twitter.com/1/statuses/user_timeline.json?' | |
DEFAULT_MAX_COUNT = 200 | |
def __init__(self): | |
self._query_params = dict() | |
self._query_params['include_entities'] = 'false' | |
self._query_params['screen_name'] = '' | |
self._query_params['count'] = QueryBuilder.DEFAULT_MAX_COUNT | |
self._query_params['includ_rts'] = 'false' | |
self._actual_count = QueryBuilder.DEFAULT_MAX_COUNT | |
@property | |
def include_entities(self): | |
if self._query_params['include_entities'] == 'true': | |
return True | |
return False | |
@include_entities.setter | |
def include_entities(self, value): | |
assert isinstance(value, bool), 'include_entities must be a boolean' | |
if value: | |
self._query_params['include_entities'] = 'true' | |
else: | |
self._query_params['include_entities'] = 'false' | |
@property | |
def screen_name(self): | |
if self._query_params['screen_name'] == '': | |
return None | |
return self._query_params['screen_name'] | |
@screen_name.setter | |
def screen_name(self, value): | |
assert isinstance(value, str), 'screen_name must be a string' | |
self._query_params['screen_name'] = value | |
@property | |
def count(self): | |
return int(self._actual_count) | |
@count.setter | |
def count(self, value): | |
assert isinstance(value, int), 'count must be an integer' | |
self._actual_count = str(value) | |
@property | |
def contributor_details(self): | |
if self._query_params['contributor_details'] == 'true': | |
return True | |
return False | |
@contributor_details.setter | |
def contributor_details(self, value): | |
assert isinstance(value, bool), 'contributor_details must be a boolean' | |
if value: | |
self._query_params['contributor_details'] = 'true' | |
else: | |
self._query_params['contributor_details'] = 'false' | |
def get_tweets(self): | |
if self.screen_name is None: | |
raise Exception('screen_name must be specified') | |
query_url = '&'.join('{}={}'.format(key, value) | |
for key, value in self._query_params.iteritems()) | |
full_url = '{}{}'.format(QueryBuilder.BASE_URL, query_url) | |
raw_text = urllib2.urlopen(full_url) | |
received_tweets = json.load(raw_text) | |
limit = min(len(received_tweets), self.count) | |
return received_tweets[:limit] | |
def make_record_from_tweet(tweet): | |
record = dict() | |
record['creation_time_of_day'] = get_creation_time_of_day(tweet) | |
record['hashtag_count'] = get_hashtag_count(tweet) | |
record['url_count'] = get_url_count(tweet) | |
record['user_mentions_count'] = get_user_mentions_count(tweet) | |
record['retweeted'] = get_retweeted(tweet) | |
record['retweet_count'] = get_retweet_count(tweet) | |
record['source'] = get_source(tweet) | |
record['text_length'] = get_text_length(tweet) | |
record['text_length_sans_entities'] = get_text_length_sans_entities(tweet) | |
record['avg_text_word_length'] = get_avg_text_word_length(tweet) | |
record['exclamation_point_count'] = get_exclamation_point_count(tweet) | |
record['question_mark_count'] = get_question_mark_count(tweet) | |
record['period_count'] = get_period_count(tweet) | |
record['comma_count'] = get_comma_count(tweet) | |
record['semicolon_count'] = get_semicolon_count(tweet) | |
record['ellipsis_count'] = get_ellipsis_count(tweet) | |
record['tweeter'] = get_tweeter(tweet) | |
return record | |
def get_creation_time_of_day(the_tweet): | |
start_index = the_tweet['created_at'].find(':') - 2 | |
end_index = the_tweet['created_at'].rfind(':') + 2 | |
hour, min, sec = map(int, the_tweet['created_at']\ | |
[start_index:end_index].split(':')) | |
return hour + min/60.0 | |
def get_hashtag_count(the_tweet): | |
return len(the_tweet['entities']['hashtags']) | |
def get_url_count(the_tweet): | |
return len(the_tweet['entities']['urls']) | |
def get_user_mentions_count(the_tweet): | |
return len(the_tweet['entities']['user_mentions']) | |
def get_retweeted(the_tweet): | |
return bool(the_tweet['retweeted']) | |
def get_retweet_count(the_tweet): | |
return int(the_tweet['retweet_count']) | |
def get_source(the_tweet): | |
return the_tweet['source'].replace(' ', '') | |
def get_text_length(the_tweet): | |
return len(the_tweet['text']) | |
def _get_text_sans_entities(the_tweet): # Warning: O(n^2) | |
original_text = the_tweet['text'] | |
intervals_to_cut = [] | |
intervals_to_cut += [range(*hashtag['indices']) for hashtag in | |
the_tweet['entities']['hashtags']] | |
intervals_to_cut += [range(*url['indices']) for url in | |
the_tweet['entities']['urls']] | |
intervals_to_cut += [range(*user['indices']) for user in | |
the_tweet['entities']['user_mentions']] | |
all_indices_to_cut = sum(intervals_to_cut, []) | |
modified_text = ''.join([s for i, s in enumerate(original_text) | |
if i not in all_indices_to_cut]) | |
modified_text = ' '.join(modified_text.split()) | |
return modified_text | |
def get_text_length_sans_entities(the_tweet): | |
return len(_get_text_sans_entities(the_tweet)) | |
def get_avg_text_word_length(the_tweet): | |
if len(_get_text_sans_entities(the_tweet)) == 0: | |
return 0 | |
return sum(len(x) for x in _get_text_sans_entities(the_tweet).split())/len( | |
_get_text_sans_entities(the_tweet).split()) | |
def get_exclamation_point_count(the_tweet): | |
return _get_text_sans_entities(the_tweet).count('!') | |
def get_question_mark_count(the_tweet): | |
return _get_text_sans_entities(the_tweet).count('?') | |
def get_period_count(the_tweet): | |
period_count = _get_text_sans_entities(the_tweet).count('.') | |
return period_count - 3*get_ellipsis_count(the_tweet) | |
def get_comma_count(the_tweet): | |
return _get_text_sans_entities(the_tweet).count(',') | |
def get_semicolon_count(the_tweet): | |
return _get_text_sans_entities(the_tweet).count(';') | |
def get_ellipsis_count(the_tweet): | |
return _get_text_sans_entities(the_tweet).count('...') | |
def get_tweeter(the_tweet): | |
return the_tweet['user']['screen_name'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment