-
-
Save redavis22/ef111c7256a9002941fdc4fba2ef8c35 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urlparse | |
from flask import Flask, jsonify, request | |
import requests | |
from requests_oauthlib import OAuth1 | |
import boto3 | |
dynamodb = boto3.resource('dynamodb') | |
token_table = dynamodb.Table('daysuntil_users1') | |
auth_table = dynamodb.Table('daysuntil_auth_table') | |
app = Flask(__name__) | |
consumer_key = u'GET_YOUR_OWN_KEYS' | |
consumer_secret = u'GET_SOME_KEYS' | |
base_url = 'https://api.twitter.com/1.1/' | |
request_token_url = 'https://api.twitter.com/oauth/request_token' | |
access_token_url = 'https://api.twitter.com/oauth/access_token' | |
authorize_url = 'https://api.twitter.com/oauth/authenticate' | |
callback = 'http://daysuntilreinvent.com/' | |
oauth = OAuth1(consumer_key, client_secret=consumer_secret) | |
@app.after_request | |
def after_request(response): | |
#https you lazy fuck | |
response.headers.add('Access-Control-Allow-Origin', 'http://daysuntilreinvent.com') | |
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') | |
response.headers.add('Access-Control-Allow-Methods', 'GET,POST,OPTIONS') | |
return response | |
@app.route('/access_token', methods=["POST"]) | |
def access_token(): | |
oauth_token = request.form['oauth_token'] | |
oauth_token_secret = auth_table.get_item(Key={'oauth_token': oauth_token})['Item']['oauth_token_secret'] | |
auth = OAuth1( | |
consumer_key, | |
client_secret=consumer_secret, | |
resource_owner_key=oauth_token, | |
resource_owner_secret=oauth_token_secret, | |
# a smart human would check for some shit here | |
# glad I'm not one of those... | |
verifier=request.form['oauth_verifier'] # if it doesn't exist just fail hard. | |
) | |
resp = requests.post(access_token_url, auth=auth) | |
# lol error checking, why even bother | |
if resp.status_code != 200: | |
return jsonify({'error': resp.text}), 400 | |
# parsed resp is a dict of oauth_token, oauth_token_secret, screen_name, expires | |
parsed_resp = dict(urlparse.parse_qsl(resp.content)) | |
# here we get a user image because pictures are pretty | |
user_pic_url = requests.get( | |
base_url+"users/show.json", | |
params={'screen_name': parsed_resp['screen_name']}, | |
auth=oauth | |
).json()['profile_image_url_https'] | |
parsed_resp['profile_image_url_https'] = user_pic_url | |
# store that sexy auth | |
token_table.put_item(Item=parsed_resp) | |
# well we don't need this shit anymore | |
auth_table.delete_item(Key={'oauth_token': oauth_token}) | |
return jsonify( | |
{ | |
'screen_name': parsed_resp['screen_name'], | |
'profile_image_url_https': parsed_resp['profile_image_url_https'] | |
}) | |
@app.route('/login') | |
def login(): | |
# grab a request token | |
request_token_resp = requests.post( | |
url=request_token_url, | |
auth=oauth | |
) | |
# if we have an error return in in a json {"error": ""} block with HTTP 400 | |
if request_token_resp.status_code != 200: | |
return jsonify({"error": request_token_resp.text}), 400 | |
parsed_resp = dict(urlparse.parse_qsl(request_token_resp.text)) | |
# we throw this in here for matching users but not keeping | |
auth_table.put_item( | |
Item={'oauth_token': parsed_resp['oauth_token'], | |
'oauth_token_secret': parsed_resp['oauth_token_secret']}) | |
return jsonify(parsed_resp) | |
@app.route('/logout', methods=['POST']) | |
def logout(): | |
if 'screen_name' not in request.form: | |
return 400 | |
# nice to know you. | |
token_table.delete_item(Key={'screen_name': request.form['screen_name']}) | |
# something something json | |
return "logged out" | |
if __name__ == '__main__': | |
app.run(debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment