Created
July 13, 2020 21:29
-
-
Save BenderV/f4d13d4b1a033d0dedea4747bd6deb48 to your computer and use it in GitHub Desktop.
Reset twitter following list to the top 100
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""""Warning | |
Dangerous script. | |
This will unfollow lots of people... | |
""" | |
import datetime | |
from collections import defaultdict | |
import os | |
import tweepy | |
TWITTER_CONSUMER_KEY = "" | |
TWITTER_CONSUMER_SECRET = "" | |
access_token_key = '' | |
access_token_secret = '' | |
auth = tweepy.OAuthHandler(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET) | |
auth.set_access_token(access_token_key, access_token_secret) | |
client = tweepy.API(auth) | |
client = TwitterAPI(client) | |
def author_freq(author): | |
""" | |
Return the users tweets freq | |
""" | |
try: | |
freq = (datetime.datetime.now() - author.created_at) / author.statuses_count | |
seconds = freq.seconds | |
except ZeroDivisionError: | |
print(author.name) | |
seconds = 0 | |
return seconds / 3600 | |
class TwitterAPI: | |
"""Wrapper arount twitter APIs""" | |
def __init__(self, client): | |
self.client = client | |
self.screen_name = client.me().name | |
def get_all_timeline_tweets(self): | |
"""Iterate to get all tweets""" | |
all_tweets = [] | |
new_tweets = self.client.user_timeline(count=200) | |
all_tweets.extend(new_tweets) | |
if all_tweets: | |
oldest = all_tweets[-1].id - 1 | |
# keep grabbing tweets until there are no tweets left to grab | |
while len(new_tweets) > 0: | |
print("getting tweets before %s" % (oldest)) | |
new_tweets = self.client.user_timeline( | |
screen_name=screen_name, count=200, max_id=oldest | |
) | |
all_tweets.extend(new_tweets) | |
oldest = all_tweets[-1].id - 1 | |
print("...%s tweets downloaded so far" % (len(all_tweets))) | |
return all_tweets | |
def get_all_favorite_tweets(self, screen_name=None): | |
all_tweets = [] | |
if screen_name is None: | |
new_tweets = self.client.favorites(count=200) | |
else: | |
new_tweets = self.client.favorites(screen_name=screen_name, count=200) | |
all_tweets.extend(new_tweets) | |
if all_tweets: | |
oldest = all_tweets[-1].id - 1 | |
# keep grabbing tweets until there are no tweets left to grab | |
while len(new_tweets) > 0: | |
print("getting tweets before %s" % (oldest)) | |
new_tweets = self.client.favorites( | |
screen_name=screen_name, count=200, max_id=oldest | |
) | |
all_tweets.extend(new_tweets) | |
oldest = all_tweets[-1].id - 1 | |
print("...%s tweets downloaded so far" % (len(all_tweets))) | |
return all_tweets | |
def get_current_follows(self): | |
return [f.screen_name for f in self.client.friends()] | |
def get_authors_scores(self): | |
tweets = self.get_all_timeline_tweets() | |
tweets += self.get_all_favorite_tweets() | |
authors = defaultdict(dict) | |
for tweet in tweets: | |
authors[tweet.author.name]["points"] = 0 | |
authors[tweet.author.name]["freq"] = author_freq(tweet.author) | |
authors[tweet.author.name]["status"] = tweet.author | |
if hasattr(tweet, "retweeted_status"): | |
authors[tweet.retweeted_status.author.name]["points"] = 0 | |
authors[tweet.retweeted_status.author.name]["freq"] = author_freq( | |
tweet.retweeted_status.author | |
) | |
authors[tweet.retweeted_status.author.name][ | |
"status" | |
] = tweet.retweeted_status.author | |
for tweet in tweets: | |
if tweet.favorited and tweet.author.screen_name != self.screen_name: | |
authors[tweet.author.name]["points"] += 1 | |
if hasattr(tweet, "retweeted_status"): | |
authors[tweet.retweeted_status.author.name]["points"] += 1 | |
if tweet.retweeted_status.favorited: | |
authors[tweet.retweeted_status.author.name]["points"] += 1 | |
results = sorted(authors.items(), key=lambda x: -x[1]["points"] * x[1]["freq"]) | |
for author in results[:100]: | |
print( | |
"{}: @{}".format( | |
" " * (2 - len(str(author[1]["points"]))) | |
+ str(author[1]["points"]), | |
author[1]["status"].screen_name, | |
) | |
) | |
def format_result(rec): | |
return { | |
"name": rec[0], | |
"points": rec[1]["points"], | |
"freq": rec[1]["freq"], | |
"status": rec[1]["status"]._json, | |
} | |
results = [format_result(r) for r in results] | |
return results | |
def del_all_tweets(self, tweets): | |
"""Warning: this script works | |
""" | |
for status in tweets: | |
try: | |
client.destroy_status(status.id) | |
print("Deleted:", status.id) | |
except: | |
print("Failed to delete:", status.id) | |
def cut_the_noise(self, authors): | |
"""Follow only the most 100 most interesting | |
""" | |
for ind, author in enumerate(authors): | |
if ind <= 100: | |
print("Follow : {}".format(user.name)) | |
client.get_user(id=author.id).follow() | |
else: | |
print("Unfollow : {}".format(user.name)) | |
client.get_user(id=author.id).unfollow() | |
# # cut_the_noise | |
# current_follow = self.client.friends_ids() | |
# to_follow = [r[1]["status"].id for r in results[:100]] | |
# not_follow = [r[1]["status"].id for r in results[100:]] | |
# # Unfollow noisy people | |
# for follow in current_follow: | |
# if follow not in to_follow: | |
# user = self.client.get_user(id=follow) | |
# # user.unfollow() | |
# # Follow noisy people | |
# for follow in to_follow: | |
# if follow not in current_follow: | |
# user = self.client.get_user(id=follow) | |
# # user.follow() | |
def follow_the_white_rabbits(): | |
"""Take a list of users and follow their overall | |
top follows""" | |
current_follows = [ | |
"elonmusk", | |
"naval", | |
"karpathy", | |
"patrickc", | |
] | |
tweets = [] | |
for follow_screen_name in current_follows: | |
tweets += get_all_favorite_tweets(follow_screen_name) | |
if __name__ == '__main__': | |
authors = client.get_authors_scores() | |
client.cut_the_noise(authors) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment