Skip to content

Instantly share code, notes, and snippets.

@benwtr
Created November 9, 2011 07:44
Show Gist options
  • Save benwtr/1350749 to your computer and use it in GitHub Desktop.
Save benwtr/1350749 to your computer and use it in GitHub Desktop.
duosecurity post-auth script for openvpn access server
# This is a post-auth script to integrate Duo Security with OpenVPN Access Server
#
# glued together from openvpn-as sample code and duo's community openvpn python
# integration script
#
# Disclaimer: I am not a python programmer
#
import os, sys, urllib, hashlib, httplib, hmac, base64, json, syslog, time, re
from pyovpn.plugin import *
SYNCHRONOUS=True
# Specifies the challenge text. The AS will place this string
# in client profiles so that clients can prompt the user for
# a response.
STATIC_CHALLENGE = 'Enter Duo passcode, "push", "phone", or "sms" for passcodes'
# Should user's response be echoed?
STATIC_CHALLENGE_ECHO = True
IKEY = ''
SKEY = ''
HOST = ''
API_RESULT_AUTH = 'auth'
API_RESULT_ALLOW = 'allow'
API_RESULT_DENY = 'deny'
API_RESULT_ENROLL = 'enroll'
def canonicalize(method, host, uri, params):
canon = [method.upper(), host.lower(), uri]
args = []
for key in sorted(params.keys()):
val = params[key]
arg = '%s=%s' % (urllib.quote(key, '~'), urllib.quote(val, '~'))
args.append(arg)
canon.append('&'.join(args))
return '\n'.join(canon)
def sign(ikey, skey, method, host, uri, params):
sig = hmac.new(skey, canonicalize(method, host, uri, params), hashlib.sha1)
auth = '%s:%s' % (ikey, sig.hexdigest())
return 'Basic %s' % base64.b64encode(auth)
def call(ikey, skey, host, method, path, **kwargs):
headers = {'Authorization':sign(ikey, skey, method, host, path, kwargs)}
if method in [ 'POST', 'PUT' ]:
headers['Content-type'] = 'application/x-www-form-urlencoded'
body = urllib.urlencode(kwargs, doseq=True)
uri = path
else:
body = None
uri = path + '?' + urllib.urlencode(kwargs, doseq=True)
conn = httplib.HTTPSConnection(host, 443)
conn.request(method, uri, body, headers)
response = conn.getresponse()
data = response.read()
conn.close()
return (response.status, response.reason, data)
def api(ikey, skey, host, method, path, **kwargs):
(status, reason, data) = call(ikey, skey, host, method, path, **kwargs)
if status != 200:
raise RuntimeError('Received %s %s: %s' % (status, reason, data))
try:
data = json.loads(data)
if data['stat'] != 'OK':
raise RuntimeError('Received error response: %s' % data)
return data['response']
except (ValueError, KeyError):
raise RuntimeError('Received bad response: %s' % data)
def log(msg):
msg = 'Duo OpenVPN: %s' % msg
syslog.syslog(msg)
def preauth(username):
log('pre-authentication for %s' % username)
args = {
'user': username,
}
response = api(IKEY, SKEY, HOST, 'POST', '/rest/v1/preauth', **args)
result = response.get('result')
if not result:
log('invalid API response: %s' % response)
raise RuntimeError('invalid API response: %s' % response)
return
if result == API_RESULT_AUTH:
return result
status = response.get('status')
if not status:
log('invalid API response: %s' % response)
raise RuntimeError('invalid API response: %s' % response)
return
if result == API_RESULT_ENROLL:
log('user %s is not enrolled: %s' % (username, status))
elif result == API_RESULT_DENY:
log('preauth failure for %s: %s' % (username, status))
elif result == API_RESULT_ALLOW:
log('preauth success for %s: %s' % (username, status))
else:
log('unknown preauth result: %s' % result)
return result
def auth(username, password, ipaddr):
log('authentication for %s' % username)
args = {
'user': username,
'factor': 'auto',
'auto': password,
'ipaddr': ipaddr
}
response = api(IKEY, SKEY, HOST, 'POST', '/rest/v1/auth', **args)
result = response.get('result')
status = response.get('status')
if not result or not status:
log('invalid API response: %s' % response)
raise RuntimeError('invalid API response: %s' % response)
return
if result == API_RESULT_ALLOW:
log('auth success for %s: %s' % (username, status))
elif result == API_RESULT_DENY:
log('auth failure for %s: %s' % (username, status))
else:
log('unknown auth result: %s' % result)
return result
def post_auth_cr(authcred, attributes, authret, info, crstate):
# see post_auth.txt for a detailed description of these members
print "**********************************************"
print "AUTHCRED", authcred
print "ATTRIBUTES", attributes
print "AUTHRET", authret
print "INFO", info
print "**********************************************"
# Don't do challenge/response on sessions or autologin clients.
# autologin client: a client that has been issued a special
# certificate allowing authentication with only a certificate
# (used for unattended clients such as servers).
# session: a client that has already authenticated and received
# a session token. The client is attempting to authenticate
# again using the session token.
if info.get('auth_method') in ('session', 'autologin'):
return authret
username = re.split('-', authcred['username'])[0]
ipaddr = authcred['client_ip_addr']
# was response provided? -- we support responses issued for both static and dynamic challenges
duo_pass = authcred.get('static_response') # response to Static Challenge provided along with username/password
if not duo_pass:
duo_pass = crstate.response() # response to dynamic challenge
if duo_pass:
# received response
crstate.expire()
try:
if auth(username,duo_pass,ipaddr) == API_RESULT_ALLOW:
authret['status'] = SUCCEED
authret['reason'] = "Duo authentication succeeded"
else:
authret['status'] = FAIL
authret['reason'] = "Duo authentication failed"
except Exception, e:
log(str(e))
authret['status'] = FAIL
authret['reason'] = "Exception caught in auth: %{e}s"
# allow end user to see actual error text
authret['client_reason'] = authret['reason']
elif crstate.get('challenge'):
# received an empty or null response after challenge issued
crstate.expire() # make sure to expire crstate at the end of the challenge/response transaction
authret['status'] = FAIL
authret['reason'] = "No response was provided to Duo challenge"
# allow end user to see actual error text
authret['client_reason'] = authret['reason']
else:
# initial auth request without static response; issue challenge
try:
if preauth(username) == API_RESULT_AUTH:
crstate['challenge'] = True # save state indicating challenge has been issued
crstate.challenge_post_auth(authret, STATIC_CHALLENGE, echo=STATIC_CHALLENGE_ECHO)
except Exception, e:
log(str(e))
authret['status'] = FAIL
authret['reason'] = "Exception caught in pre-auth: %{e}s"
return authret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment