Skip to content

Instantly share code, notes, and snippets.

@pauleveritt
Created October 27, 2014 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pauleveritt/098231b8bd95c97b03a2 to your computer and use it in GitHub Desktop.
Save pauleveritt/098231b8bd95c97b03a2 to your computer and use it in GitHub Desktop.
"""
Authentication schemes for Moonrock:
- Username/password
- Twitter
"""
from urllib.parse import parse_qsl
from urllib.parse import urlencode
import requests
from requests_oauthlib import OAuth1
from rest_toolkit import resource
from pyramid.httpexceptions import HTTPFound, HTTPUnauthorized
from .security import create_jwt_token
from pyramid_sqlalchemy import Session
from .models.users import User
@resource('/api/auth')
class AuthResource:
request_token_url = 'https://api.twitter.com/oauth/request_token'
access_token_url = 'https://api.twitter.com/oauth/access_token'
authenticate_url = 'https://api.twitter.com/oauth/authenticate'
def __init__(self, request):
self.request = request
settings = request.registry.settings
self.consumer_key = settings.get('TWITTER_CONSUMER_KEY')
self.client_secret = settings.get('TWITTER_CONSUMER_SECRET')
self.callback_uri = settings.get('TWITTER_CALLBACK_URL')
self.token_secret = settings.get('TOKEN_SECRET')
@AuthResource.controller(name='login', request_method='POST')
def login_resource(resource, request):
settings = request.registry.settings
token_secret = settings.get('TOKEN_SECRET')
username = request.json_body.get('username')
password = request.json_body.get('password')
user = Session.query(User).filter(User.username == username).one()
if user:
user_password = user.password
if user_password and user_password == password:
token = create_jwt_token(user, token_secret)
return dict(token=token)
# Otherwise bail out with an HTTP exception
raise HTTPUnauthorized(detail='Invalid username or password')
@AuthResource.controller(name='twitter', request_method='GET')
def twitter_get_url(resource, request):
"""Step 1 of OAuth1 dance"""
print('\n###step 1')
oauth = OAuth1(resource.consumer_key,
client_secret=resource.client_secret,
callback_uri=resource.callback_uri)
r = requests.post(resource.request_token_url, auth=oauth)
oauth_token = dict(parse_qsl(r.text))
qs = urlencode(dict(oauth_token=oauth_token['oauth_token']))
url = resource.authenticate_url + '?' + qs
return HTTPFound(location=url)
@AuthResource.controller(name='twitter', request_method='GET',
request_param=['oauth_token', 'oauth_verifier'])
def twitter_process_params(resource, request):
"""Step 2 of OAuth1 dance"""
print('\n###step 2')
auth = OAuth1(resource.consumer_key,
client_secret=resource.client_secret,
resource_owner_key=request.params.get(
'oauth_token'),
verifier=request.params.get('oauth_verifier'))
r = requests.post(resource.access_token_url, auth=auth)
profile = dict(parse_qsl(r.text))
user = Session.query(User).filter(
User.twitter == profile['screen_name']).one()
token = create_jwt_token(user, resource.token_secret)
return dict(token=token)
def includeme(config):
config.scan('.auth')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment