Created
November 9, 2011 07:44
-
-
Save benwtr/1350749 to your computer and use it in GitHub Desktop.
duosecurity post-auth script for openvpn access server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a post-auth script to integrate Duo Security with OpenVPN Access Server | |
# | |
# glued together from openvpn-as sample code and duo's community openvpn python | |
# integration script | |
# | |
# Disclaimer: I am not a python programmer | |
# | |
import os, sys, urllib, hashlib, httplib, hmac, base64, json, syslog, time, re | |
from pyovpn.plugin import * | |
SYNCHRONOUS=True | |
# Specifies the challenge text. The AS will place this string | |
# in client profiles so that clients can prompt the user for | |
# a response. | |
STATIC_CHALLENGE = 'Enter Duo passcode, "push", "phone", or "sms" for passcodes' | |
# Should user's response be echoed? | |
STATIC_CHALLENGE_ECHO = True | |
IKEY = '' | |
SKEY = '' | |
HOST = '' | |
API_RESULT_AUTH = 'auth' | |
API_RESULT_ALLOW = 'allow' | |
API_RESULT_DENY = 'deny' | |
API_RESULT_ENROLL = 'enroll' | |
def canonicalize(method, host, uri, params): | |
canon = [method.upper(), host.lower(), uri] | |
args = [] | |
for key in sorted(params.keys()): | |
val = params[key] | |
arg = '%s=%s' % (urllib.quote(key, '~'), urllib.quote(val, '~')) | |
args.append(arg) | |
canon.append('&'.join(args)) | |
return '\n'.join(canon) | |
def sign(ikey, skey, method, host, uri, params): | |
sig = hmac.new(skey, canonicalize(method, host, uri, params), hashlib.sha1) | |
auth = '%s:%s' % (ikey, sig.hexdigest()) | |
return 'Basic %s' % base64.b64encode(auth) | |
def call(ikey, skey, host, method, path, **kwargs): | |
headers = {'Authorization':sign(ikey, skey, method, host, path, kwargs)} | |
if method in [ 'POST', 'PUT' ]: | |
headers['Content-type'] = 'application/x-www-form-urlencoded' | |
body = urllib.urlencode(kwargs, doseq=True) | |
uri = path | |
else: | |
body = None | |
uri = path + '?' + urllib.urlencode(kwargs, doseq=True) | |
conn = httplib.HTTPSConnection(host, 443) | |
conn.request(method, uri, body, headers) | |
response = conn.getresponse() | |
data = response.read() | |
conn.close() | |
return (response.status, response.reason, data) | |
def api(ikey, skey, host, method, path, **kwargs): | |
(status, reason, data) = call(ikey, skey, host, method, path, **kwargs) | |
if status != 200: | |
raise RuntimeError('Received %s %s: %s' % (status, reason, data)) | |
try: | |
data = json.loads(data) | |
if data['stat'] != 'OK': | |
raise RuntimeError('Received error response: %s' % data) | |
return data['response'] | |
except (ValueError, KeyError): | |
raise RuntimeError('Received bad response: %s' % data) | |
def log(msg): | |
msg = 'Duo OpenVPN: %s' % msg | |
syslog.syslog(msg) | |
def preauth(username): | |
log('pre-authentication for %s' % username) | |
args = { | |
'user': username, | |
} | |
response = api(IKEY, SKEY, HOST, 'POST', '/rest/v1/preauth', **args) | |
result = response.get('result') | |
if not result: | |
log('invalid API response: %s' % response) | |
raise RuntimeError('invalid API response: %s' % response) | |
return | |
if result == API_RESULT_AUTH: | |
return result | |
status = response.get('status') | |
if not status: | |
log('invalid API response: %s' % response) | |
raise RuntimeError('invalid API response: %s' % response) | |
return | |
if result == API_RESULT_ENROLL: | |
log('user %s is not enrolled: %s' % (username, status)) | |
elif result == API_RESULT_DENY: | |
log('preauth failure for %s: %s' % (username, status)) | |
elif result == API_RESULT_ALLOW: | |
log('preauth success for %s: %s' % (username, status)) | |
else: | |
log('unknown preauth result: %s' % result) | |
return result | |
def auth(username, password, ipaddr): | |
log('authentication for %s' % username) | |
args = { | |
'user': username, | |
'factor': 'auto', | |
'auto': password, | |
'ipaddr': ipaddr | |
} | |
response = api(IKEY, SKEY, HOST, 'POST', '/rest/v1/auth', **args) | |
result = response.get('result') | |
status = response.get('status') | |
if not result or not status: | |
log('invalid API response: %s' % response) | |
raise RuntimeError('invalid API response: %s' % response) | |
return | |
if result == API_RESULT_ALLOW: | |
log('auth success for %s: %s' % (username, status)) | |
elif result == API_RESULT_DENY: | |
log('auth failure for %s: %s' % (username, status)) | |
else: | |
log('unknown auth result: %s' % result) | |
return result | |
def post_auth_cr(authcred, attributes, authret, info, crstate): | |
# see post_auth.txt for a detailed description of these members | |
print "**********************************************" | |
print "AUTHCRED", authcred | |
print "ATTRIBUTES", attributes | |
print "AUTHRET", authret | |
print "INFO", info | |
print "**********************************************" | |
# Don't do challenge/response on sessions or autologin clients. | |
# autologin client: a client that has been issued a special | |
# certificate allowing authentication with only a certificate | |
# (used for unattended clients such as servers). | |
# session: a client that has already authenticated and received | |
# a session token. The client is attempting to authenticate | |
# again using the session token. | |
if info.get('auth_method') in ('session', 'autologin'): | |
return authret | |
username = re.split('-', authcred['username'])[0] | |
ipaddr = authcred['client_ip_addr'] | |
# was response provided? -- we support responses issued for both static and dynamic challenges | |
duo_pass = authcred.get('static_response') # response to Static Challenge provided along with username/password | |
if not duo_pass: | |
duo_pass = crstate.response() # response to dynamic challenge | |
if duo_pass: | |
# received response | |
crstate.expire() | |
try: | |
if auth(username,duo_pass,ipaddr) == API_RESULT_ALLOW: | |
authret['status'] = SUCCEED | |
authret['reason'] = "Duo authentication succeeded" | |
else: | |
authret['status'] = FAIL | |
authret['reason'] = "Duo authentication failed" | |
except Exception, e: | |
log(str(e)) | |
authret['status'] = FAIL | |
authret['reason'] = "Exception caught in auth: %{e}s" | |
# allow end user to see actual error text | |
authret['client_reason'] = authret['reason'] | |
elif crstate.get('challenge'): | |
# received an empty or null response after challenge issued | |
crstate.expire() # make sure to expire crstate at the end of the challenge/response transaction | |
authret['status'] = FAIL | |
authret['reason'] = "No response was provided to Duo challenge" | |
# allow end user to see actual error text | |
authret['client_reason'] = authret['reason'] | |
else: | |
# initial auth request without static response; issue challenge | |
try: | |
if preauth(username) == API_RESULT_AUTH: | |
crstate['challenge'] = True # save state indicating challenge has been issued | |
crstate.challenge_post_auth(authret, STATIC_CHALLENGE, echo=STATIC_CHALLENGE_ECHO) | |
except Exception, e: | |
log(str(e)) | |
authret['status'] = FAIL | |
authret['reason'] = "Exception caught in pre-auth: %{e}s" | |
return authret |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment