Last active
March 25, 2019 07:07
-
-
Save roytang/e3846e72ccf3e883d56734d3c365636e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def twitter_get(url): | |
resp = twitter.get(url) | |
if resp.status_code == 429: | |
# twitter rate limit was hit | |
print("Rate limit hit when calling %s" % (url)) | |
raise CustomException("Rate limit exceeded! You are using the API too often. Try again after 15 minutes or so.") | |
return resp | |
def intersection(list_a, list_b): | |
return [ e for e in list_a if e in list_b ] | |
@app.route('/twitter', methods=['GET', 'POST']) | |
def twitter_tools(): | |
screen_name = "" | |
if not twitter.authorized: | |
# return redirect(url_for("twitter.login")) | |
return render_template('twitter_login.html') | |
try: | |
resp = twitter_get("account/settings.json") | |
assert resp.ok | |
screen_name=resp.json()["screen_name"] | |
if request.method == 'POST': | |
# https://twitter.com/roytang/status/1087344612941824001 | |
message = "" | |
un1 = request.form.get("un1") | |
un2 = request.form.get("un2") | |
items = [] | |
fc1 = 0 | |
fc2 = 0 | |
results = False | |
if len(un1) == 0 or len(un2) == 0: | |
message = "Please input two usernames" | |
else: | |
resp = twitter_get("users/show.json?screen_name="+un1) | |
fc1 = resp.json()["friends_count"] | |
if fc1 > CURSOR_LIMIT: | |
raise CustomException("%s has too many follows (%s) for this operation, sorry!" % (un1, fc1)) | |
follows1 = get_friends(un1) | |
resp = twitter_get("users/show.json?screen_name="+un2) | |
fc2 = resp.json()["followers_count"] | |
if fc2 > CURSOR_LIMIT: | |
raise CustomException("%s has too many followers (%s) for this operation, sorry!" % (un2, fc2)) | |
followers2 = get_followers(un2) | |
intersect = intersection(follows1, followers2) | |
for i in intersect: | |
resp = twitter_get("users/show.json?user_id="+str(i)) | |
userinfo = resp.json() | |
items.append(userinfo["screen_name"]) | |
results = True | |
return render_template('twitter.html', screen_name=screen_name, message=message, un1=un1, un2=un2, items=items, fc1=fc1, fc2=fc2, results=results) | |
return render_template('twitter.html', screen_name=screen_name) | |
except CustomException as e: | |
return render_template('twitter.html', screen_name=screen_name, message=str(e)) | |
def get_cursor_list(base_url, json_elem): | |
l = [] | |
next_cursor = -1 | |
while next_cursor != 0: | |
url = base_url | |
if next_cursor > 0: | |
url = url + "&cursor=" + str(next_cursor) | |
resp = twitter_get(url) | |
l.extend(resp.json()[json_elem]) | |
next_cursor = resp.json()["next_cursor"] | |
return l | |
def get_followers(screen_name): | |
return get_cursor_list("followers/ids.json?screen_name="+screen_name, "ids") | |
def get_friends(screen_name): | |
return get_cursor_list("friends/ids.json?screen_name="+screen_name, "ids") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment