Skip to content

Instantly share code, notes, and snippets.

@christian-hawk
Last active September 7, 2022 21:39
Show Gist options
  • Save christian-hawk/3c9b982cd2e226fb27537665a770036b to your computer and use it in GitHub Desktop.
Save christian-hawk/3c9b982cd2e226fb27537665a770036b to your computer and use it in GitHub Desktop.
inbound-saml
# oxAuth is available under the MIT License (2008). See http://opensource.org/licenses/MIT for full text.
# Copyright (c) 2016, Gluu
#
# Author: Christian Eland
#
from org.gluu.service.cdi.util import CdiUtil
from org.gluu.oxauth.security import Identity
from org.gluu.model.custom.script.type.auth import PersonAuthenticationType
from org.gluu.oxauth.service import AuthenticationService
from org.gluu.util import StringHelper
from org.gluu.jsf2.service import FacesService
from org.gluu.oxauth.util import ServerUtil
from org.gluu.oxauth.model.common import User
from org.gluu.oxauth.service.common import UserService
from org.gluu.oxauth.model.jwt import Jwt
from urllib import unquote
import json
import base64
import java
MAPPING = {
'uid': 'urn:oid:0.9.2342.19200300.100.1.3', # email
'displayName': 'urn:oid:2.16.840.1.113730.3.1.241',
'cn': 'urn:oid:2.5.4.3',
'givenName': 'urn:oid:2.5.4.42',
'sn': 'urn:oid:2.5.4.4',
'mail': 'urn:oid:0.9.2342.19200300.100.1.3'
}
PROXY_HOST = 'idpproxy-test.usg.edu'
class Helper():
def generateRedirectUrl():
'''
generates url with provider
'''
def generateExternalUid(self, providerHost, profile):
'''
generates id in format inbound-saml:provider:uid
'''
nickname = 'inbound-saml'
uid = profile['urn:oid:0.9.2342.19200300.100.1.1']
externalUid = '%s:%s:%s' % (nickname, providerHost, uid)
return externalUid
def getUserByExternalUid(self, externalId, userService):
user = userService.getUserByAttribute("oxExternalUid", newFormat, True)
if user == None:
oldFormat = "passport-%s:%s" % ("saml", uid)
user = userService.getUserByAttribute("oxExternalUid", oldFormat, True)
if user != None:
# Migrate to newer format
list = HashSet(user.getAttributeValues("oxExternalUid"))
list.remove(oldFormat)
list.add(newFormat)
user.setAttribute("oxExternalUid", ArrayList(list), True)
print "Migrating user's oxExternalUid to newer format 'passport-saml:provider:uid'"
userService.updateUser(user)
return user
class ProfileMapper():
def __init__(self, mapping):
self.mapping = mapping
def map(self, profile, user):
for userAttributeName in self.mapping:
profileAttributeName = self.mapping[userAttributeName]
print('looking for %s in profile...' % profileAttributeName)
if profileAttributeName in profile:
print('FOUND!')
profileAttributeValue = profile[profileAttributeName]
print('mapping %s to %s' % (userAttributeName, profileAttributeValue))
else:
print('NOT FOUND!')
user.setAttribute(userAttributeName, profileAttributeValue)
return user
class PersonAuthentication(PersonAuthenticationType):
def __init__(self, currentTimeMillis):
self.currentTimeMillis = currentTimeMillis
def init(self, customScript, configurationAttributes):
print "InboundSaml. Initialization"
print 'configurationATtributes: %s' % configurationAttributes
print "InboundSaml. Initialized successfully"
return True
def destroy(self, configurationAttributes):
print "InboundSaml. Destroy"
print "InboundSaml. Destroyed successfully"
return True
def getAuthenticationMethodClaims(self, requestParameters):
return None
def getApiVersion(self):
return 11
def isValidAuthenticationMethod(self, usageType, configurationAttributes):
return True
def getAlternativeAuthenticationMethod(self, usageType, configurationAttributes):
return None
def authenticate(self, configurationAttributes, requestParameters, step):
print('Entered authenticate for step %s' % step)
authenticationService = CdiUtil.bean(AuthenticationService)
# print(unquote(requestParameters.get('user')[0]).decode('utf8'))
profile = json.loads(unquote(requestParameters.get('user')[0]).decode('utf8'))
profileMapper = ProfileMapper(MAPPING)
emptyUser = User()
mappedUser = profileMapper.map(profile, emptyUser)
# setup oxTrust attributes
print('Inbound-saml: Setting oxtrust attributes...')
mappedUser.setAttribute('oxTrustEmail', ['{"value":"%s", "primary":false}' % (profile['mail'])])
identity = CdiUtil.bean(Identity)
sessionAttributes = identity.getSessionId().getSessionAttributes()
print('Inbound-saml: Entered prepareForStep - step %s' % step)
# print('sessionAttributes.get(providerParam) %s' % sessionAttributes.get("providerHost"))
providerHostParamValue = base64.urlsafe_b64decode(str(sessionAttributes.get("providerHost")))
providerHost = json.loads(providerHostParamValue)['providerHost']
helper = Helper()
oxExternalUid = helper.generateExternalUid(providerHost, profile)
userService = userService = CdiUtil.bean(UserService)
foundUser = userService.getUserByAttribute("oxExternalUid", oxExternalUid, True)
if foundUser == None:
mappedUser.setAttribute('oxExternalUid', oxExternalUid)
foundUser = userService.addUser(mappedUser, True)
username = foundUser.getUserId()
logged_in = CdiUtil.bean(AuthenticationService).authenticate(username)
return logged_in
def prepareForStep(self, configurationAttributes, requestParameters, step):
identity = CdiUtil.bean(Identity)
sessionAttributes = identity.getSessionId().getSessionAttributes()
print('Entered prepareForStep - step %s' % step)
# print('sessionAttributes.get(providerParam) %s' % sessionAttributes.get("providerHost"))
providerHostParamValue = base64.urlsafe_b64decode(str(sessionAttributes.get("providerHost")))
providerHost = json.loads(providerHostParamValue)['providerHost']
print('Inbound-saml: Preparing to start flow with providerHost %s' % providerHost)
if step == 1:
facesService = CdiUtil.bean(FacesService)
redirectUrl = 'https://%s/inbound-saml/sp/authenticate?providerHost=%s' % (PROXY_HOST, providerHost)
facesService.redirectToExternalURL(redirectUrl)
def getExtraParametersForStep(self, configurationAttributes, step):
return None
def getCountAuthenticationSteps(self, configurationAttributes):
return 1
def getPageForStep(self, configurationAttributes, step):
return ""
def getNextStep(self, configurationAttributes, requestParameters, step):
return -1
def getLogoutExternalUrl(self, configurationAttributes, requestParameters):
print "Get external logout URL call"
return None
def logout(self, configurationAttributes, requestParameters):
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment