Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

GraphQL example

Graphene Tutorial

Example of authentication mutations and user queries in GraphQL API.

"""
First we need to create a type from our database models (Django in this case)
"""
import graphene
from graphene_django import DjangoObjectType
from ..models import User
class UserType(DjangoObjectType):
full_name = graphene.String()
photo = graphene.String()
class Meta:
model = User
only_fields = [
'id', 'key',
'first_name', 'last_name', 'full_name',
'phone_number', 'mobile_number', 'email', 'photo', 'role',
'is_active', 'last_login'
]
@staticmethod
def resolve_photo(obj, info):
if not obj.photo:
return None
return obj.photo.url
@staticmethod
def resolve_full_name(obj, info):
return obj.get_full_name()
"""
Queries are like views in REST framework - they return a list of UserTypes (list of all users)
or a single UserType (e.g. currently authenticated user)
"""
import graphene
from django.contrib.auth import get_user_model
from .decorators import login_required
from .types import UserType
User = get_user_model()
class AuthUserQuery:
"""
Currently authenticated user or None if user is anonymous
"""
auth_user = graphene.Field(UserType)
@classmethod
def resolve_auth_user(cls, obj, info):
request = info.context
if not request or not request.user or not request.user.is_authenticated:
return None
return request.user
class UserQuery:
users = graphene.List(UserType, project=graphene.String())
@classmethod
@login_required
def resolve_users(cls, obj, info):
return User.objects.all()
"""
Example mutations used for authentication
"""
import smtplib
import graphene
from django.conf import settings
from django.contrib.auth import authenticate, login, logout
from django.core.mail import send_mail
from django.db import IntegrityError
from .models import User
from .decorators import login_required
from .mutations import BaseMutation
from .types import UserType
class Login(BaseMutation):
class Arguments:
email = graphene.String()
password = graphene.String()
keep = graphene.Boolean()
user = graphene.Field(UserType)
@classmethod
def mutate(cls, obj, info, *, email, password, keep=False):
request = info.context
user = authenticate(request, email=email, password=password)
if user is None:
return cls(user=None, ok=False, error='Invalid email or password')
if not keep:
request.session.set_expiry(0)
login(request, user)
return cls(user=user)
class Logout(BaseMutation):
@classmethod
@login_required
def mutate(cls, obj, info):
request = info.context
logout(request)
return cls()
class Register(BaseMutation):
class Arguments:
email = graphene.String()
password = graphene.String()
user = graphene.Field(UserType)
@classmethod
def mutate(cls, obj, info, *, email, password):
email = email.strip()
try:
user = User.objects.create_user(
email=email, password=password)
except IntegrityError:
return cls(user=None, ok=False, error='Email already registered')
return cls(user=user)
class PasswordResetRequest(BaseMutation):
class Arguments:
email = graphene.String()
token = graphene.String()
@staticmethod
def send_mail(email, token):
msg = ('You\'ve requested password reset at https://{domain_web}/. '
'Please visit https://{domain_web}/password/new/?verify={token}'
' and set your new password.'
).format(domain_web=settings.DOMAIN_WEB, token=token)
send_mail(
'Password reset verification',
msg,
settings.DEFAULT_FROM_EMAIL,
[email],
)
@classmethod
def mutate(cls, obj, info, *, email):
token = User.objects.create_reset_password_token(email)
if not token:
return cls(ok=False, error='Invalid email', token=None)
try:
PasswordResetRequest.send_mail(email, token)
except smtplib.SMTPException:
return cls(ok=False, error='Email not sent')
return cls(token=token)
class PasswordReset(BaseMutation):
class Arguments:
token = graphene.String()
password = graphene.String()
@staticmethod
def send_mail(email):
msg = ('Your password has changed. '
'Please visit https://{domain_web}/ and verify.'
).format(domain_web=settings.DOMAIN_WEB)
send_mail(
'Password reset confirmation',
msg,
settings.DEFAULT_FROM_EMAIL,
[email],
)
@classmethod
def mutate(cls, obj, info, *, token, password):
user = User.objects.verify_reset_password_token(token)
if not user:
return cls(ok=False, error='Invalid token')
try:
PasswordReset.send_mail(user.email)
except smtplib.SMTPException:
# Token is already invalidated at this point, but email with
# confirmation hasn't been sent. Don't change password silently,
# rather request user to generate new token.
return cls(ok=False, error='Invalid token')
user.set_password(password)
user.save(update_fields=['password'])
return cls()
class AdminPasswordChange(BaseMutation):
"""
Change password for *any* user in database. Requires admin privileges.
"""
class Arguments:
email = graphene.String()
password = graphene.String()
@classmethod
def mutate(cls, obj, info, *, email, password):
if not info.context.user.is_admin:
return cls(ok=False, error='Not authorized')
user = User.objects.get(email__iexact=email)
user.set_password(password)
user.save(update_fields=['password'])
return cls()
"""
finally, we define the schema by listing queries and mutations (types are embedded automatically)
"""
from .queries import AuthUserQuery, UserQuery
from .mutations import (
Login, Logout, Register, PasswordResetRequest, PasswordReset,
AdminPasswordChange
)
class Query(AuthUserQuery, UserQuery):
pass
class Mutations:
auth_login = Login.Field()
auth_logout = Logout.Field()
auth_register = Register.Field()
auth_password_reset_request = PasswordResetRequest.Field()
auth_password_reset = PasswordReset.Field()
auth_password_change = AdminPasswordChange.Field()
schema = graphene.Schema(
query=Query,
mutation=Mutations)
import graphene
def authorization_required(auth_func):
"""
Factory for authorization decorators. Pass `auth_func` which returns when
authorized to perform action, False otherwise. `auth_func` receives
request and current object as a param.
"""
def decorator(function):
def wrapper(cls, obj, info, **kwargs):
if not auth_func(info.context, obj):
if issubclass(cls, graphene.Mutation):
return cls(ok=False, error='Not authorized')
else:
return None
return function(cls, obj, info, **kwargs)
return wrapper
return decorator
def is_authenticated(request, obj):
return request and request.user and request.user.is_authenticated
login_required = authorization_required(is_authenticated)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.