Created
May 7, 2014 03:23
-
-
Save adamlwgriffiths/2d6510916cdc4623f770 to your computer and use it in GitHub Desktop.
Testing flask security auth issues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, Blueprint, Response, request, request_finished, request_started | |
from flask.views import MethodView | |
from flask.ext.security import Security, MongoEngineUserDatastore, UserMixin, RoleMixin, current_user, auth_required, login_required | |
from flask.ext.security.utils import login_user, verify_password, encrypt_password | |
from flask.ext.security.decorators import _check_token | |
import mongoengine | |
from mongoengine import StringField, BooleanField, DateTimeField, ListField, ReferenceField, LongField, ObjectIdField | |
from flask.ext.mongoengine import MongoEngine, Document | |
import json | |
BUMP_TOKEN=True | |
ENABLE_COOKIES=True | |
#TOKEN='Authentication-token' | |
TOKEN='X-Auth-Token' | |
#AUTH_METHODS=['Token'] | |
AUTH_METHODS=['Token', 'Session'] | |
USE_LOGIN_REQUIRED=True | |
#USE_LOGIN_REQUIRED=False | |
#USE_AUTH_REQUIRED=True | |
USE_AUTH_REQUIRED=False | |
def auth_deco(fn): | |
if USE_LOGIN_REQUIRED: | |
fn = login_required(fn) | |
if USE_AUTH_REQUIRED: | |
fn = auth_required(*AUTH_METHODS)(fn) | |
return fn | |
class Role(Document, RoleMixin): | |
name = StringField(max_length=80, unique=True) | |
description = StringField(max_length=255) | |
class User(Document, UserMixin): | |
email = StringField(required=True, max_length=255, unique=True) | |
password = StringField(required=True, max_length=255) | |
display_name = StringField(max_length=255) | |
active = BooleanField(default=True) | |
confirmed_at = DateTimeField() | |
anonymous = BooleanField(default=False) | |
roles = ListField(ReferenceField(Role,dbref = False), default=[]) | |
last_login_at = DateTimeField() | |
current_login_at = DateTimeField() | |
login_count = LongField() | |
def make_json_response(body_dict, status): | |
response = Response(response=json.dumps(body_dict), status=status, mimetype='application/json; charset=UTF-8') | |
return response | |
class Authenticated(MethodView): | |
#@login_required | |
#@auth_required(*AUTH_METHODS) | |
@auth_deco | |
def get(self): | |
print 'Authenticated Get' | |
print '\t', current_user, current_user.is_authenticated() | |
if BUMP_TOKEN: | |
_check_token() | |
print '\t', current_user, current_user.is_authenticated() | |
return make_json_response({}, 200) | |
class Anonymous(MethodView): | |
def get(self): | |
print 'Anonymous Get' | |
print '\t', current_user, current_user.is_authenticated() | |
if BUMP_TOKEN: | |
_check_token() | |
print '\t', current_user, current_user.is_authenticated() | |
return make_json_response({}, 200) | |
class SignInView(MethodView): | |
def post(self): | |
print 'SigninView Post' | |
data = request.json | |
email = data['email'].lower() | |
# HACK: create a user here | |
# HACK: don't do this in a real app! | |
User.objects(email=email).delete() | |
encrypted_password = encrypt_password(password) | |
user_datastore.create_user(email=email, password=encrypted_password) | |
user = User.objects(email=email).first() | |
if user: | |
if verify_password(data['password'], user.password): | |
print '\t', current_user, current_user.is_authenticated() | |
login_user(user) | |
print '\t', current_user, current_user.is_authenticated() | |
return make_json_response({}, 201) | |
return make_json_response({}, 403) | |
def on_request_started(sender, **extra): | |
print 'Request Started' | |
print '\t', current_user, current_user.is_authenticated() | |
if BUMP_TOKEN: | |
_check_token() | |
print '\t', current_user, current_user.is_authenticated() | |
def on_request_finished(sender, response, **extra): | |
print 'Request Finished' | |
print '\t', current_user, current_user.is_authenticated() | |
if BUMP_TOKEN: | |
_check_token() | |
print '\t', current_user, current_user.is_authenticated() | |
if current_user.is_authenticated(): | |
response.headers.add(TOKEN, current_user.get_auth_token()) | |
blueprint = Blueprint('views', __name__) | |
blueprint.add_url_rule('/authenticated' , view_func= Authenticated.as_view('authenticated')) | |
blueprint.add_url_rule('/anonymous' , view_func= Anonymous.as_view('anonymous')) | |
blueprint.add_url_rule('/signin', view_func=SignInView.as_view('signin')) | |
# create app | |
app = Flask(__name__) | |
# configure app | |
app.config['DEBUG'] = True | |
app.config['SECRET_KEY'] = 'abc123' | |
app.config['SECURITY_PASSWORD_HASH'] = 'bcrypt' | |
app.config['SECURITY_PASSWORD_SALT'] = 'abc123' | |
app.config['SECURITY_SEND_REGISTER_EMAIL'] = False | |
app.config['SECURITY_REGISTERABLE'] = False | |
app.config['SECURITY_UNAUTHORIZED_VIEW'] = None | |
app.config['SECURITY_CONFIRMABLE'] = False | |
app.config['SECURITY_REGISTERABLE'] = False | |
app.config['SECURITY_RECOVERABLE'] = False | |
app.config['SECURITY_TRACKABLE'] = False | |
app.config['SECURITY_PASSWORDLESS'] = False | |
app.config['SECURITY_CHANGEABLE'] = False | |
app.config['SECURITY_TOKEN_AUTHENTICATION_HEADER'] = TOKEN | |
# add mongoengine | |
db = MongoEngine() | |
db.connection = mongoengine.connect('flask-security-test') | |
db.app = app | |
# add security | |
user_datastore = MongoEngineUserDatastore(db, User, Role) | |
security = Security(app, user_datastore) | |
# register blueprints and signals | |
app.register_blueprint(blueprint) | |
request_started.connect(on_request_started, app) | |
request_finished.connect(on_request_finished, app) | |
# create a user | |
email='abc@123.com' | |
password='abc123' | |
# create the test client and begin testing | |
client = app.test_client(use_cookies=ENABLE_COOKIES) | |
print '**********' | |
x = client.get('/anonymous') | |
print x.status_code | |
print '**********' | |
x = client.get('/authenticated') | |
print x.status_code | |
print '**********' | |
x = client.post('/signin', content_type='application/json', data=json.dumps({ | |
"email": email, | |
"password": password, | |
})) | |
print x.status_code | |
print '**********' | |
x = client.get('/anonymous', headers={'Authentication-token':x.headers[TOKEN]}) | |
print x.status_code | |
print '**********' | |
x = client.get('/authenticated', headers={'Authentication-token':x.headers[TOKEN]}) | |
print x.status_code |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment