Last active
September 7, 2022 21:39
-
-
Save christian-hawk/3c9b982cd2e226fb27537665a770036b to your computer and use it in GitHub Desktop.
inbound-saml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# oxAuth is available under the MIT License (2008). See http://opensource.org/licenses/MIT for full text. | |
# Copyright (c) 2016, Gluu | |
# | |
# Author: Christian Eland | |
# | |
from org.gluu.service.cdi.util import CdiUtil | |
from org.gluu.oxauth.security import Identity | |
from org.gluu.model.custom.script.type.auth import PersonAuthenticationType | |
from org.gluu.oxauth.service import AuthenticationService | |
from org.gluu.util import StringHelper | |
from org.gluu.jsf2.service import FacesService | |
from org.gluu.oxauth.util import ServerUtil | |
from org.gluu.oxauth.model.common import User | |
from org.gluu.oxauth.service.common import UserService | |
from org.gluu.oxauth.model.jwt import Jwt | |
from urllib import unquote | |
import json | |
import base64 | |
import java | |
MAPPING = { | |
'uid': 'urn:oid:0.9.2342.19200300.100.1.3', # email | |
'displayName': 'urn:oid:2.16.840.1.113730.3.1.241', | |
'cn': 'urn:oid:2.5.4.3', | |
'givenName': 'urn:oid:2.5.4.42', | |
'sn': 'urn:oid:2.5.4.4', | |
'mail': 'urn:oid:0.9.2342.19200300.100.1.3' | |
} | |
PROXY_HOST = 'idpproxy-test.usg.edu' | |
class Helper(): | |
def generateRedirectUrl(): | |
''' | |
generates url with provider | |
''' | |
def generateExternalUid(self, providerHost, profile): | |
''' | |
generates id in format inbound-saml:provider:uid | |
''' | |
nickname = 'inbound-saml' | |
uid = profile['urn:oid:0.9.2342.19200300.100.1.1'] | |
externalUid = '%s:%s:%s' % (nickname, providerHost, uid) | |
return externalUid | |
def getUserByExternalUid(self, externalId, userService): | |
user = userService.getUserByAttribute("oxExternalUid", newFormat, True) | |
if user == None: | |
oldFormat = "passport-%s:%s" % ("saml", uid) | |
user = userService.getUserByAttribute("oxExternalUid", oldFormat, True) | |
if user != None: | |
# Migrate to newer format | |
list = HashSet(user.getAttributeValues("oxExternalUid")) | |
list.remove(oldFormat) | |
list.add(newFormat) | |
user.setAttribute("oxExternalUid", ArrayList(list), True) | |
print "Migrating user's oxExternalUid to newer format 'passport-saml:provider:uid'" | |
userService.updateUser(user) | |
return user | |
class ProfileMapper(): | |
def __init__(self, mapping): | |
self.mapping = mapping | |
def map(self, profile, user): | |
for userAttributeName in self.mapping: | |
profileAttributeName = self.mapping[userAttributeName] | |
print('looking for %s in profile...' % profileAttributeName) | |
if profileAttributeName in profile: | |
print('FOUND!') | |
profileAttributeValue = profile[profileAttributeName] | |
print('mapping %s to %s' % (userAttributeName, profileAttributeValue)) | |
else: | |
print('NOT FOUND!') | |
user.setAttribute(userAttributeName, profileAttributeValue) | |
return user | |
class PersonAuthentication(PersonAuthenticationType): | |
def __init__(self, currentTimeMillis): | |
self.currentTimeMillis = currentTimeMillis | |
def init(self, customScript, configurationAttributes): | |
print "InboundSaml. Initialization" | |
print 'configurationATtributes: %s' % configurationAttributes | |
print "InboundSaml. Initialized successfully" | |
return True | |
def destroy(self, configurationAttributes): | |
print "InboundSaml. Destroy" | |
print "InboundSaml. Destroyed successfully" | |
return True | |
def getAuthenticationMethodClaims(self, requestParameters): | |
return None | |
def getApiVersion(self): | |
return 11 | |
def isValidAuthenticationMethod(self, usageType, configurationAttributes): | |
return True | |
def getAlternativeAuthenticationMethod(self, usageType, configurationAttributes): | |
return None | |
def authenticate(self, configurationAttributes, requestParameters, step): | |
print('Entered authenticate for step %s' % step) | |
authenticationService = CdiUtil.bean(AuthenticationService) | |
# print(unquote(requestParameters.get('user')[0]).decode('utf8')) | |
profile = json.loads(unquote(requestParameters.get('user')[0]).decode('utf8')) | |
profileMapper = ProfileMapper(MAPPING) | |
emptyUser = User() | |
mappedUser = profileMapper.map(profile, emptyUser) | |
# setup oxTrust attributes | |
print('Inbound-saml: Setting oxtrust attributes...') | |
mappedUser.setAttribute('oxTrustEmail', ['{"value":"%s", "primary":false}' % (profile['mail'])]) | |
identity = CdiUtil.bean(Identity) | |
sessionAttributes = identity.getSessionId().getSessionAttributes() | |
print('Inbound-saml: Entered prepareForStep - step %s' % step) | |
# print('sessionAttributes.get(providerParam) %s' % sessionAttributes.get("providerHost")) | |
providerHostParamValue = base64.urlsafe_b64decode(str(sessionAttributes.get("providerHost"))) | |
providerHost = json.loads(providerHostParamValue)['providerHost'] | |
helper = Helper() | |
oxExternalUid = helper.generateExternalUid(providerHost, profile) | |
userService = userService = CdiUtil.bean(UserService) | |
foundUser = userService.getUserByAttribute("oxExternalUid", oxExternalUid, True) | |
if foundUser == None: | |
mappedUser.setAttribute('oxExternalUid', oxExternalUid) | |
foundUser = userService.addUser(mappedUser, True) | |
username = foundUser.getUserId() | |
logged_in = CdiUtil.bean(AuthenticationService).authenticate(username) | |
return logged_in | |
def prepareForStep(self, configurationAttributes, requestParameters, step): | |
identity = CdiUtil.bean(Identity) | |
sessionAttributes = identity.getSessionId().getSessionAttributes() | |
print('Entered prepareForStep - step %s' % step) | |
# print('sessionAttributes.get(providerParam) %s' % sessionAttributes.get("providerHost")) | |
providerHostParamValue = base64.urlsafe_b64decode(str(sessionAttributes.get("providerHost"))) | |
providerHost = json.loads(providerHostParamValue)['providerHost'] | |
print('Inbound-saml: Preparing to start flow with providerHost %s' % providerHost) | |
if step == 1: | |
facesService = CdiUtil.bean(FacesService) | |
redirectUrl = 'https://%s/inbound-saml/sp/authenticate?providerHost=%s' % (PROXY_HOST, providerHost) | |
facesService.redirectToExternalURL(redirectUrl) | |
def getExtraParametersForStep(self, configurationAttributes, step): | |
return None | |
def getCountAuthenticationSteps(self, configurationAttributes): | |
return 1 | |
def getPageForStep(self, configurationAttributes, step): | |
return "" | |
def getNextStep(self, configurationAttributes, requestParameters, step): | |
return -1 | |
def getLogoutExternalUrl(self, configurationAttributes, requestParameters): | |
print "Get external logout URL call" | |
return None | |
def logout(self, configurationAttributes, requestParameters): | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment