Skip to content

Instantly share code, notes, and snippets.

@jarshwah
Last active August 29, 2015 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jarshwah/c5b9abebb452f2e3286f to your computer and use it in GitHub Desktop.
Save jarshwah/c5b9abebb452f2e3286f to your computer and use it in GitHub Desktop.
Django Custom Auth Backend - if refactored
import base64
import hashlib
import hmac
import requests
import time
from django.contrib.auth import get_user_model
from django.conf import settings
from django.core.cache import cache
from django.utils.http import unquote
from .models import Tenant, Task, UserTask
class APIException(Exception):
pass
class APIAuth(requests.auth.AuthBase):
def __call__(self, prepared):
prepared.prepare_url(prepared.url, dict(
api_key=settings.API_KEY,
timestamp=int(time.time()*1000)))
signature = base64.b64encode(hmac.new(
settings.SHARED_SECRET.encode(),
msg=unquote(prepared.url).encode(),
digestmod=hashlib.sha256
).digest()).decode()
prepared.headers['Authorization'] = "signature {0}".format(signature)
return prepared
class APIBackend(object):
def __init__(self):
self.auth = APIAuth()
self.api_key = settings.API_KEY
self.gen_user = settings.API_USER
self.gen_pw = settings.API_PASS
self.cache_key = 'api_session_key'
self.urls = {
method : settings.API_URL + method
for method in (
'login', 'authenticate', 'change_password', 'get_roles')}
def authenticate(self, username=None, password=None, **kwargs):
resp = self._api_authenticate(username, password)
if resp.ok:
r = resp.json().get('Result')
User = get_user_model()
try:
user = User._default_manager.get_by_natural_key(username)
except User.DoesNotExist:
user = User(
username=username,
first_name=r.get('firstName', 'Unknown'),
last_name=r.get('lastName', 'Unknown'),
email=r.get('email', 'none@example.com'),
)
user.save()
user.set_unusable_password()
self._cache_permissions(user)
return user
return None
def get_user(self, user_id):
User = get_user_model()
try:
return User._default_manager.get(pk=user_id)
except User.DoesNotExist:
return None
def get_all_permissions(self, user_obj, obj=None):
if not hasattr(user_obj, '_api_perm_cache'):
user_tasks = UserTask.objects.select_related(
'user', 'task').filter(user=user_obj)
perms = set("%s.%s" % (ut.task.module, ut.task.name)
for ut in user_tasks)
user_obj._api_perm_cache = perms
return user_obj._api_perm_cache
def has_perm(self, user_obj, perm, obj=None):
return perm in self.get_all_permissions(user_obj, obj)
def has_module_perm(self, user_obj, module):
return any(
perm for perm in self.get_all_permissions(user_obj)
if perm[:perm.index('.')] == module)
def change_password(self, user_obj, current_password, new_password):
resp = self._api_change_password(user_obj.username, current_password, new_password)
return resp.ok
@property
def api_session_id(self):
if not hasattr(self, '_api_session_id'):
sid = cache.get(self.cache_key)
if sid is None:
self._api_login()
sid = cache.get(self.cache_key)
self._api_session_id = sid
return self._api_session_id
def _api_login(self):
url = self.urls['login']
post_data = {'username': self.gen_user, 'password': self.gen_pw}
resp = requests.post(url, data=post_data, auth=self.auth)
if resp.ok:
session_id = resp.json().get('Result').get('sessionId')
cache.set(self.cache_key, session_id, timeout=60*60*24*1) # 1 day
self._api_session_id = session_id
return resp
raise APIException(
"Could not login to API. Status Code %s" % resp.status_code)
def _api_authenticate(self, username, password):
url = self.urls['authenticate']
get_data = {'sessionId': self.api_session_id}
post_data = {'username': username, 'password': password}
resp = requests.post(
url,
params=get_data,
data=post_data,
auth=self.auth)
return resp
def _api_get_roles(self, username):
url = self.urls['get_roles']
get_data = {'username': username, 'sessionId': self.api_session_id}
resp = requests.get(url, params=get_data, auth=self.auth)
return resp
def _cache_permissions(self, user):
pass # removed for brevity
def _api_change_password(self, username, old_password, new_password):
url = self.urls['change_password']
post_data = {
'username': username,
'oldpassword': old_password,
'newpassword': new_password
}
get_data = {'sessionId': self.api_session_id}
resp = requests.post(url, params=get_data, data=post_data, auth=self.auth)
return resp
from collections import OrderedDict
from django.contrib.auth.forms import PasswordChangeForm
from crispy_forms.helper import FormHelper
class APIPasswordChangeForm(PasswordChangeForm):
# this is now only needed for styling purposes
helper = FormHelper()
helper.form_tag = False
helper.label_class = 'col-md-5'
helper.field_class = 'col-md-7'
def __init__(self, *args, **kwargs):
user = kwargs.pop('user')
super(APIPasswordChangeForm, self).__init__(user, *args, **kwargs)
# ensures correct ordering of form fields
APIPasswordChangeForm.base_fields = OrderedDict(
(k, APIPasswordChangeForm.base_fields[k])
for k in ['old_password', 'new_password1', 'new_password2']
)
from django.core.urlresolvers import reverse_lazy
from django.views.generic.edit import FormView
from braces.views import LoginRequiredMixin, UserFormKwargsMixin
from .forms import APIPasswordChangeForm
class ChangePasswordFormView(UserFormKwargsMixin, FormView):
template_name = 'api/change_password.html'
form_class = APIPasswordChangeForm
success_url = reverse_lazy('api:change_password_success')
def form_valid(self, form):
form.save()
return super(ChangePasswordFormView, self).form_valid(form)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment