Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Collect twitter followers
import tweepy
import time
import os
import sys
import json
import argparse
FOLLOWING_DIR = 'following'
MAX_FRIENDS = 200
FRIENDS_OF_FRIENDS_LIMIT = 200
if not os.path.exists(FOLLOWING_DIR):
os.makedir(FOLLOWING_DIR)
enc = lambda x: x.encode('ascii', errors='ignore')
# The consumer keys can be found on your application's Details
# page located at https://dev.twitter.com/apps (under "OAuth settings")
CONSUMER_KEY = 'XXXXXXXXXXXXXXXXXXXXXXXXX'
CONSUMER_SECRET = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
# The access tokens can be found on your applications's Details
# page located at https://dev.twitter.com/apps (located
# under "Your access token")
ACCESS_TOKEN = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
ACCESS_TOKEN_SECRET = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
# == OAuth Authentication ==
#
# This mode of authentication is the new preferred way
# of authenticating with Twitter.
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
def get_follower_ids(centre, max_depth=1, current_depth=0, taboo_list=[]):
# print 'current depth: %d, max depth: %d' % (current_depth, max_depth)
# print 'taboo list: ', ','.join([ str(i) for i in taboo_list ])
if current_depth == max_depth:
print 'out of depth'
return taboo_list
if centre in taboo_list:
# we've been here before
print 'Already been here.'
return taboo_list
else:
taboo_list.append(centre)
try:
userfname = os.path.join('twitter-users', str(centre) + '.json')
if not os.path.exists(userfname):
print 'Retrieving user details for twitter id %s' % str(centre)
while True:
try:
user = api.get_user(centre)
d = {'name': user.name,
'screen_name': user.screen_name,
'id': user.id,
'friends_count': user.friends_count,
'followers_count': user.followers_count,
'followers_ids': user.followers_ids()}
with open(userfname, 'w') as outf:
outf.write(json.dumps(d, indent=1))
user = d
break
except tweepy.TweepError, error:
print type(error)
if str(error) == 'Not authorized.':
print 'Can''t access user data - not authorized.'
return taboo_list
if str(error) == 'User has been suspended.':
print 'User suspended.'
return taboo_list
errorObj = error[0][0]
print errorObj
if errorObj['message'] == 'Rate limit exceeded':
print 'Rate limited. Sleeping for 15 minutes.'
time.sleep(15 * 60 + 15)
continue
return taboo_list
else:
user = json.loads(file(userfname).read())
screen_name = enc(user['screen_name'])
fname = os.path.join(FOLLOWING_DIR, screen_name + '.csv')
friendids = []
# only retrieve friends of TED... screen names
if screen_name.startswith('TED'):
if not os.path.exists(fname):
print 'No cached data for screen name "%s"' % screen_name
with open(fname, 'w') as outf:
params = (enc(user['name']), screen_name)
print 'Retrieving friends for user "%s" (%s)' % params
# page over friends
c = tweepy.Cursor(api.friends, id=user['id']).items()
friend_count = 0
while True:
try:
friend = c.next()
friendids.append(friend.id)
params = (friend.id, enc(friend.screen_name), enc(friend.name))
outf.write('%s\t%s\t%s\n' % params)
friend_count += 1
if friend_count >= MAX_FRIENDS:
print 'Reached max no. of friends for "%s".' % friend.screen_name
break
except tweepy.TweepError:
# hit rate limit, sleep for 15 minutes
print 'Rate limited. Sleeping for 15 minutes.'
time.sleep(15 * 60 + 15)
continue
except StopIteration:
break
else:
friendids = [int(line.strip().split('\t')[0]) for line in file(fname)]
print 'Found %d friends for %s' % (len(friendids), screen_name)
# get friends of friends
cd = current_depth
if cd+1 < max_depth:
for fid in friendids[:FRIENDS_OF_FRIENDS_LIMIT]:
taboo_list = get_follower_ids(fid, max_depth=max_depth,
current_depth=cd+1, taboo_list=taboo_list)
if cd+1 < max_depth and len(friendids) > FRIENDS_OF_FRIENDS_LIMIT:
print 'Not all friends retrieved for %s.' % screen_name
except Exception, error:
print 'Error retrieving followers for user id: ', centre
print error
if os.path.exists(fname):
os.remove(fname)
print 'Removed file "%s".' % fname
sys.exit(1)
return taboo_list
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.add_argument("-s", "--screen-name", required=True, help="Screen name of twitter user")
ap.add_argument("-d", "--depth", required=True, type=int, help="How far to follow user network")
args = vars(ap.parse_args())
twitter_screenname = args['screen_name']
depth = int(args['depth'])
if depth < 1 or depth > 3:
print 'Depth value %d is not valid. Valid range is 1-3.' % depth
sys.exit('Invalid depth argument.')
print 'Max Depth: %d' % depth
matches = api.lookup_users(screen_names=[twitter_screenname])
if len(matches) == 1:
print get_follower_ids(matches[0].id, max_depth=depth)
else:
print 'Sorry, could not find twitter user with screen name: %s' % twitter_screenname
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment