Skip to content

Instantly share code, notes, and snippets.

@lepture

lepture/OI.md Secret

Last active September 2, 2019 12:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lepture/ecf8ccdbb2ef4b635799d5669308cb50 to your computer and use it in GitHub Desktop.
Save lepture/ecf8ccdbb2ef4b635799d5669308cb50 to your computer and use it in GitHub Desktop.
Authlib v0.12 Changes

OpenID Connect

In this version, Authlib "Grant" is redesigned. You don't need to change anything for the normal grant types. But if you are working on "OpenID Connect" server. Here is what you need to do:

CodeFlowAuth

authlib.oidc.core.grants.OpenIDCodeGrant is deprecated. Instead, you can use OpenIDCode extension:

from from authlib.oauth2.rfc6749 import grants
from authlib.oidc.core.grants import OpenIDCode
from authlib.oidc.core import UserInfo

class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
    def create_authorization_code(self, client, grant_user, request):
        # ...
    # ... implement missing methods
    

class OpenIDCodeExtension(OpenIDCode):
    def get_jwt_config(self, grant):
        # key can be JWK Set
        return dict(key=private_key, alg='RS256', iss='https://...', exp=3600)
        
    def generate_user_info(self, user, scopes):
        return UserInfo({'sub': str(self.id), 'name': self.name, ...})
        
    def exists_nonce(self, nonce, request):
        return is_authorization_code_exists(client_id=request.client_id, nonce=nonce)

# register it as an extension for AuthorizationCodeGrant
authorization_server.register_grant(AuthorizationCodeGrant, [OpenIDCodeExtension()])

ImplictFlow

Register implict flow for OpenID Connect

from authlib.oidc.core import grants

class OpenIDImplicitGrant(grants.OpenIDImplicitGrant):
    def get_jwt_config(self):
        return dict(key=private_key, alg='RS256', iss='https://...', exp=3600)

    def generate_user_info(self, user, scopes):
        return UserInfo({'sub': str(self.id), 'name': self.name, ...})

    def exists_nonce(self, nonce, request):
        return is_authorization_code_exists(client_id=request.client_id, nonce=nonce)

authorization_server.register_grant(OpenIDImplicitGrant)

HybridFlow

Hybrid flow is a mix of CodeFlow and ImplicitFlow. You MUST implement CodeFlow to support HybridFlow. You also need to add one more OpenIDHybridGrant:

from authlib.oidc.core import grants

class OpenIDHybridGrant(grants.OpenIDHybridGrant):
    def create_authorization_code(self, client, grant_user, request):
        nonce = request.data.get('nonce')
        return generate_authorization_code(
            client, grant_user, request, nonce=nonce)

    def get_jwt_config(self):
        return dict(key=private_key, alg='RS256', iss='https://...', exp=3600)

    def generate_user_info(self, user, scopes):
        return UserInfo({'sub': str(self.id), 'name': self.name, ...})

    def exists_nonce(self, nonce, request):
        return is_authorization_code_exists(client_id=request.client_id, nonce=nonce)

authorization_server.register_grant(OpenIDHybridGrant)

NOTE: YOU MUST also implement CodeFlow.

Refresh Token Grant

In this version, Authlib added a revoke_old_credential method on RefreshTokenGrant. Before the change:

from authlib.oauth2.rfc6749 import grants

class RefreshTokenGrant(grants.RefreshTokenGrant):
    def authenticate_refresh_token(self, refresh_token):
        item = Token.query.filter_by(refresh_token=refresh_token).first()
        # define is_refresh_token_valid by yourself
        # usually, you should check if refresh token is expired and revoked
        if item and item.is_refresh_token_active():
            return item

    def authenticate_user(self, credential):
        return User.query.get(credential.user_id)

After the change:

from authlib.oauth2.rfc6749 import grants

class RefreshTokenGrant(grants.RefreshTokenGrant):
    def authenticate_refresh_token(self, refresh_token):
        item = Token.query.filter_by(refresh_token=refresh_token).first()
        # define is_refresh_token_valid by yourself
        # usually, you should check if refresh token is expired and revoked
        if item and item.is_refresh_token_active():
            return item

    def authenticate_user(self, credential):
        return User.query.get(credential.user_id)

    def revoke_old_credential(self, credential):
        credential.revoked = True
        db.session.add(credential)
        db.session.commit()
@lepture
Copy link
Author

lepture commented Jul 10, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment