Created
July 17, 2022 07:38
-
-
Save viraj071/f023b8e0dd8469eee34295150a5816bb to your computer and use it in GitHub Desktop.
Using next-auth.js and python-social-auth for multi-provider account linking with Django
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import axios from "axios"; | |
import NextAuth from "next-auth" | |
import { getToken } from "next-auth/jwt"; | |
import Google from "next-auth/providers/google"; | |
import Twitter from "next-auth/providers/twitter"; | |
import { isJwtExpired } from "../../../constants/Utils"; | |
var qs = require('qs'); | |
/** | |
* Takes a token, and returns a new token with updated | |
* `accessToken` and `refreshToken`. If an error occurs, | |
* returns the old token and an error property | |
*/ | |
async function refreshAccessToken(token) { | |
try { | |
const response = await axios.post( | |
process.env.NEXT_PUBLIC_BACKEND_BASE + "/api/auth/token/refresh/", { | |
refresh: token.refreshToken | |
}); | |
const { access, refresh } = response.data.data; | |
return { | |
...token, | |
accessToken: access, | |
refreshToken: refresh, | |
} | |
} catch (error) { | |
console.log(error) | |
return { | |
...token, | |
error: "RefreshTokenError" | |
} | |
} | |
} | |
export default async function auth(req, res) { | |
const providers = [ | |
Google({ | |
clientId: process.env.GOOGLE_CLIENT_ID, | |
clientSecret: process.env.GOOGLE_CLIENT_SECRET, | |
authorization: { | |
params: { | |
access_type: "offline", | |
response_type: "code", | |
scope:'openid profile email https://www.googleapis.com/auth/gmail.send https://www.googleapis.com/auth/spreadsheets.readonly' | |
} | |
}, | |
}), | |
Twitter({ | |
clientId: process.env.TWITTER_CLIENT_ID, | |
clientSecret: process.env.TWITTER_CLIENT_SECRET, | |
authorization: { | |
params: { | |
"include_email": true | |
} | |
} | |
}) | |
] | |
const isDefaultSigninPage = req.method === "GET" && req.query.nextauth.includes("signin") | |
// Will hide the `Twitter` when you visit `/api/auth/signin` | |
if (isDefaultSigninPage) providers.pop() | |
return await NextAuth(req, res, { | |
providers, | |
callbacks: { | |
async jwt({ token, user, account}) { | |
// Initial sign in | |
if (account && user) { | |
if (account.provider === "google") { | |
const { access_token, refresh_token, expires_at, token_type} = account; | |
try { | |
// make a POST request to the DRF backend | |
const response = await axios.post( | |
process.env.NEXT_PUBLIC_BACKEND_BASE + "/api/auth/register-by-token/google-oauth2/", { | |
access_token: access_token, | |
refresh_token: refresh_token, | |
expires_in: expires_at - Math.floor((new Date).getTime()/1000), | |
token_type: token_type | |
} | |
); | |
const { access, refresh } = response.data.data; | |
token = { | |
...token, | |
accessToken: access, | |
refreshToken: refresh, | |
}; | |
return token | |
} catch (error) { | |
console.log(error) | |
return { | |
...token, | |
error: "NewUserTokenError" | |
} | |
} | |
} else if (account.provider === 'twitter') { | |
const { oauth_token, oauth_token_secret } = account; | |
// With twitter you still use the previous token as this provider is only used for linking purposes at the backend. | |
const token = await getToken({req}); | |
try { | |
// make a POST request to the DRF backend | |
await axios.post( | |
process.env.NEXT_PUBLIC_BACKEND_BASE + "/api/auth/associate-by-token/twitter/", { | |
access_token: qs.stringify({ | |
"oauth_token": oauth_token, | |
"oauth_token_secret": oauth_token_secret | |
}) | |
}, { | |
headers: { | |
'Authorization': `Bearer ${token.accessToken}` | |
} | |
} | |
); | |
return token | |
} catch (error) { | |
console.log(error) | |
return { | |
...token, | |
error: "NewUserTokenError" | |
} | |
} | |
} | |
return { | |
...token, | |
error: "InvalidProviderError" | |
} | |
} | |
if (isJwtExpired(token.accessToken)) { | |
return refreshAccessToken(token) | |
} else { | |
return token | |
} | |
}, | |
async session({ session, token }) { | |
session.accessToken = token.accessToken | |
session.refreshToken = token.refreshToken | |
session.error = token.error | |
return session | |
} | |
}, | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from social_core.backends.twitter import TwitterOAuth | |
def use_logged_in_user(backend, *args, **kwargs): | |
logged_in_user = kwargs.get('user') | |
if logged_in_user: | |
return { | |
'user': logged_in_user | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SOCIAL_AUTH_PIPELINE = ( | |
'social_core.pipeline.social_auth.social_details', | |
'social_core.pipeline.social_auth.social_uid', | |
'social_core.pipeline.social_auth.auth_allowed', | |
# Check if request is from an authenticated user | |
'authentication.pipeline.use_logged_in_user', | |
'social_core.pipeline.social_auth.social_user', | |
'social_core.pipeline.user.get_username', | |
'social_core.pipeline.user.create_user', | |
'social_core.pipeline.social_auth.associate_user', | |
'social_core.pipeline.social_auth.load_extra_data', | |
'social_core.pipeline.user.user_details', | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@api_view(['POST']) | |
@psa() | |
@permission_classes((AllowAny,)) | |
def RegisterByToken(request, backend): | |
# If it's needed, request.backend and request.strategy will be loaded with the current | |
# backend and strategy. | |
request_serializer = SocialAuthRequestSerializer(data=request.data) | |
if request_serializer.is_valid(raise_exception=True): | |
access_token = request_serializer.validated_data.get('access_token') | |
user = request.backend.do_auth(access_token, response=request_serializer.validated_data) | |
if user: | |
response_serializer = SocialAuthResponseSerializer(get_tokens_for_user(user)) | |
return Response(response_serializer.data) | |
else: | |
return Response(status=401) | |
def get_tokens_for_user(user): | |
refresh = RefreshToken.for_user(user) | |
update_last_login(None, user) | |
return { | |
'refresh': str(refresh), | |
'access': str(refresh.access_token), | |
} | |
@api_view(['POST']) | |
@psa() | |
@permission_classes((IsAuthenticated, )) | |
def AssociateByToken(request, backend): | |
# If it's needed, request.backend and request.strategy will be loaded with the current | |
# backend and strategy. | |
request_serializer = SocialAuthRequestSerializer(data=request.data) | |
if request_serializer.is_valid(raise_exception=True): | |
access_token = request_serializer.validated_data.get('access_token') | |
user = request.backend.do_auth(access_token, response=request_serializer.validated_data, user=request.user) | |
if user: | |
return Response(status=200) | |
else: | |
return Response(status=401) | |
@api_view(['POST']) | |
@psa() | |
@permission_classes((IsAuthenticated, )) | |
def DisconnectBackend(request, backend, uid): | |
# If it's needed, request.backend and request.strategy will be loaded with the current | |
# backend and strategy. | |
association_id = UserSocialAuth.get_social_auth(backend, uid).id | |
request.backend.disconnect(user=request.user, association_id=association_id) | |
return Response(status=200) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment