Skip to content

Instantly share code, notes, and snippets.

@viraj071
Created July 17, 2022 07:38
Show Gist options
  • Save viraj071/f023b8e0dd8469eee34295150a5816bb to your computer and use it in GitHub Desktop.
Save viraj071/f023b8e0dd8469eee34295150a5816bb to your computer and use it in GitHub Desktop.
Using next-auth.js and python-social-auth for multi-provider account linking with Django
import axios from "axios";
import NextAuth from "next-auth"
import { getToken } from "next-auth/jwt";
import Google from "next-auth/providers/google";
import Twitter from "next-auth/providers/twitter";
import { isJwtExpired } from "../../../constants/Utils";
var qs = require('qs');
/**
* Takes a token, and returns a new token with updated
* `accessToken` and `refreshToken`. If an error occurs,
* returns the old token and an error property
*/
async function refreshAccessToken(token) {
try {
const response = await axios.post(
process.env.NEXT_PUBLIC_BACKEND_BASE + "/api/auth/token/refresh/", {
refresh: token.refreshToken
});
const { access, refresh } = response.data.data;
return {
...token,
accessToken: access,
refreshToken: refresh,
}
} catch (error) {
console.log(error)
return {
...token,
error: "RefreshTokenError"
}
}
}
export default async function auth(req, res) {
const providers = [
Google({
clientId: process.env.GOOGLE_CLIENT_ID,
clientSecret: process.env.GOOGLE_CLIENT_SECRET,
authorization: {
params: {
access_type: "offline",
response_type: "code",
scope:'openid profile email https://www.googleapis.com/auth/gmail.send https://www.googleapis.com/auth/spreadsheets.readonly'
}
},
}),
Twitter({
clientId: process.env.TWITTER_CLIENT_ID,
clientSecret: process.env.TWITTER_CLIENT_SECRET,
authorization: {
params: {
"include_email": true
}
}
})
]
const isDefaultSigninPage = req.method === "GET" && req.query.nextauth.includes("signin")
// Will hide the `Twitter` when you visit `/api/auth/signin`
if (isDefaultSigninPage) providers.pop()
return await NextAuth(req, res, {
providers,
callbacks: {
async jwt({ token, user, account}) {
// Initial sign in
if (account && user) {
if (account.provider === "google") {
const { access_token, refresh_token, expires_at, token_type} = account;
try {
// make a POST request to the DRF backend
const response = await axios.post(
process.env.NEXT_PUBLIC_BACKEND_BASE + "/api/auth/register-by-token/google-oauth2/", {
access_token: access_token,
refresh_token: refresh_token,
expires_in: expires_at - Math.floor((new Date).getTime()/1000),
token_type: token_type
}
);
const { access, refresh } = response.data.data;
token = {
...token,
accessToken: access,
refreshToken: refresh,
};
return token
} catch (error) {
console.log(error)
return {
...token,
error: "NewUserTokenError"
}
}
} else if (account.provider === 'twitter') {
const { oauth_token, oauth_token_secret } = account;
// With twitter you still use the previous token as this provider is only used for linking purposes at the backend.
const token = await getToken({req});
try {
// make a POST request to the DRF backend
await axios.post(
process.env.NEXT_PUBLIC_BACKEND_BASE + "/api/auth/associate-by-token/twitter/", {
access_token: qs.stringify({
"oauth_token": oauth_token,
"oauth_token_secret": oauth_token_secret
})
}, {
headers: {
'Authorization': `Bearer ${token.accessToken}`
}
}
);
return token
} catch (error) {
console.log(error)
return {
...token,
error: "NewUserTokenError"
}
}
}
return {
...token,
error: "InvalidProviderError"
}
}
if (isJwtExpired(token.accessToken)) {
return refreshAccessToken(token)
} else {
return token
}
},
async session({ session, token }) {
session.accessToken = token.accessToken
session.refreshToken = token.refreshToken
session.error = token.error
return session
}
},
})
}
from social_core.backends.twitter import TwitterOAuth
def use_logged_in_user(backend, *args, **kwargs):
logged_in_user = kwargs.get('user')
if logged_in_user:
return {
'user': logged_in_user
}
SOCIAL_AUTH_PIPELINE = (
'social_core.pipeline.social_auth.social_details',
'social_core.pipeline.social_auth.social_uid',
'social_core.pipeline.social_auth.auth_allowed',
# Check if request is from an authenticated user
'authentication.pipeline.use_logged_in_user',
'social_core.pipeline.social_auth.social_user',
'social_core.pipeline.user.get_username',
'social_core.pipeline.user.create_user',
'social_core.pipeline.social_auth.associate_user',
'social_core.pipeline.social_auth.load_extra_data',
'social_core.pipeline.user.user_details',
)
@api_view(['POST'])
@psa()
@permission_classes((AllowAny,))
def RegisterByToken(request, backend):
# If it's needed, request.backend and request.strategy will be loaded with the current
# backend and strategy.
request_serializer = SocialAuthRequestSerializer(data=request.data)
if request_serializer.is_valid(raise_exception=True):
access_token = request_serializer.validated_data.get('access_token')
user = request.backend.do_auth(access_token, response=request_serializer.validated_data)
if user:
response_serializer = SocialAuthResponseSerializer(get_tokens_for_user(user))
return Response(response_serializer.data)
else:
return Response(status=401)
def get_tokens_for_user(user):
refresh = RefreshToken.for_user(user)
update_last_login(None, user)
return {
'refresh': str(refresh),
'access': str(refresh.access_token),
}
@api_view(['POST'])
@psa()
@permission_classes((IsAuthenticated, ))
def AssociateByToken(request, backend):
# If it's needed, request.backend and request.strategy will be loaded with the current
# backend and strategy.
request_serializer = SocialAuthRequestSerializer(data=request.data)
if request_serializer.is_valid(raise_exception=True):
access_token = request_serializer.validated_data.get('access_token')
user = request.backend.do_auth(access_token, response=request_serializer.validated_data, user=request.user)
if user:
return Response(status=200)
else:
return Response(status=401)
@api_view(['POST'])
@psa()
@permission_classes((IsAuthenticated, ))
def DisconnectBackend(request, backend, uid):
# If it's needed, request.backend and request.strategy will be loaded with the current
# backend and strategy.
association_id = UserSocialAuth.get_social_auth(backend, uid).id
request.backend.disconnect(user=request.user, association_id=association_id)
return Response(status=200)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment