Skip to content

Instantly share code, notes, and snippets.

@pthrasher
Created December 6, 2018 19:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pthrasher/951823a0e8f33abe91ac1e73b80942cf to your computer and use it in GitHub Desktop.
Save pthrasher/951823a0e8f33abe91ac1e73b80942cf to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
# duo_openvpn_as.py
#
# Copyright 2012 Duo Security, Inc.
# All rights reserved, all wrongs reversed.
# ------------------------------------------------------------------
# Fill in your integration credentials on the following three lines:
IKEY = '<DUO INTEGRATION KEY HERE>'
SKEY = '<DUO INTEGRATION SECRET KEY HERE>'
HOST = '<DUO API HOSTNAME HERE>'
# To use an HTTPS proxy, enter its address below. If PROXY_HOST is
# left blank no proxy will be used.
PROXY_HOST = ''
PROXY_PORT = 8080
# Set SKIP_DUO_ON_VPN_AUTH to True to skip Duo authentication for VPN
# connections. Two-factor will only be required for other
# authentications (like web server access).
SKIP_DUO_ON_VPN_AUTH = False
# ------------------------------------------------------------------
import syslog
import tempfile
import traceback
from pyovpn.plugin import (SUCCEED, FAIL)
SYNCHRONOUS=False
API_RESULT_AUTH = 'auth'
API_RESULT_ALLOW = 'allow'
API_RESULT_DENY = 'deny'
API_RESULT_ENROLL = 'enroll'
CA_CERT = """
subject= /C=US/O=DigiCert Inc/OU=www.digicert.com/CN=DigiCert Assured ID Root CA
-----BEGIN CERTIFICATE-----
MIIDtzCCAp+gAwIBAgIQDOfg5RfYRv6P5WD8G/AwOTANBgkqhkiG9w0BAQUFADBl
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSQwIgYDVQQDExtEaWdpQ2VydCBBc3N1cmVkIElEIFJv
b3QgQ0EwHhcNMDYxMTEwMDAwMDAwWhcNMzExMTEwMDAwMDAwWjBlMQswCQYDVQQG
EwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3d3cuZGlnaWNl
cnQuY29tMSQwIgYDVQQDExtEaWdpQ2VydCBBc3N1cmVkIElEIFJvb3QgQ0EwggEi
MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCtDhXO5EOAXLGH87dg+XESpa7c
JpSIqvTO9SA5KFhgDPiA2qkVlTJhPLWxKISKityfCgyDF3qPkKyK53lTXDGEKvYP
mDI2dsze3Tyoou9q+yHyUmHfnyDXH+Kx2f4YZNISW1/5WBg1vEfNoTb5a3/UsDg+
wRvDjDPZ2C8Y/igPs6eD1sNuRMBhNZYW/lmci3Zt1/GiSw0r/wty2p5g0I6QNcZ4
VYcgoc/lbQrISXwxmDNsIumH0DJaoroTghHtORedmTpyoeb6pNnVFzF1roV9Iq4/
AUaG9ih5yLHa5FcXxH4cDrC0kqZWs72yl+2qp/C3xag/lRbQ/6GW6whfGHdPAgMB
AAGjYzBhMA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQW
BBRF66Kv9JLLgjEtUYunpyGd823IDzAfBgNVHSMEGDAWgBRF66Kv9JLLgjEtUYun
pyGd823IDzANBgkqhkiG9w0BAQUFAAOCAQEAog683+Lt8ONyc3pklL/3cmbYMuRC
dWKuh+vy1dneVrOfzM4UKLkNl2BcEkxY5NM9g0lFWJc1aRqoR+pWxnmrEthngYTf
fwk8lOa4JiwgvT2zKIn3X/8i4peEH+ll74fg38FnSbNd67IJKusm7Xi+fT8r87cm
NW1fiQG2SVufAQWbqz0lwcy2f8Lxb4bG+mRo64EtlOtCt/qMHt1i8b5QZ7dsvfPx
H2sMNgcWfzd8qVttevESRmCD1ycEvkvOl77DZypoEd+A5wwzZr8TDRRu838fYxAe
+o0bJW1sj6W3YQGx0qMmoRBxna3iw/nDmVG3KwcIzi7mULKn+gpFL6Lw8g==
-----END CERTIFICATE-----
subject= /C=US/O=DigiCert Inc/OU=www.digicert.com/CN=DigiCert Global Root CA
-----BEGIN CERTIFICATE-----
MIIDrzCCApegAwIBAgIQCDvgVpBCRrGhdWrJWZHHSjANBgkqhkiG9w0BAQUFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBD
QTAeFw0wNjExMTAwMDAwMDBaFw0zMTExMTAwMDAwMDBaMGExCzAJBgNVBAYTAlVT
MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j
b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IENBMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4jvhEXLeqKTTo1eqUKKPC3eQyaKl7hLOllsB
CSDMAZOnTjC3U/dDxGkAV53ijSLdhwZAAIEJzs4bg7/fzTtxRuLWZscFs3YnFo97
nh6Vfe63SKMI2tavegw5BmV/Sl0fvBf4q77uKNd0f3p4mVmFaG5cIzJLv07A6Fpt
43C/dxC//AH2hdmoRBBYMql1GNXRor5H4idq9Joz+EkIYIvUX7Q6hL+hqkpMfT7P
T19sdl6gSzeRntwi5m3OFBqOasv+zbMUZBfHWymeMr/y7vrTC0LUq7dBMtoM1O/4
gdW7jVg/tRvoSSiicNoxBN33shbyTApOB6jtSj1etX+jkMOvJwIDAQABo2MwYTAO
BgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUA95QNVbR
TLtm8KPiGxvDl7I90VUwHwYDVR0jBBgwFoAUA95QNVbRTLtm8KPiGxvDl7I90VUw
DQYJKoZIhvcNAQEFBQADggEBAMucN6pIExIK+t1EnE9SsPTfrgT1eXkIoyQY/Esr
hMAtudXH/vTBH1jLuG2cenTnmCmrEbXjcKChzUyImZOMkXDiqw8cvpOp/2PV5Adg
06O/nVsJ8dWO41P0jmP6P6fbtGbfYmbW0W5BjfIttep3Sp+dWOIrWcBAI+0tKIJF
PnlUkiaY4IBIqDfv8NZ5YBberOgOzW6sRBc4L0na4UU+Krk2U886UAb3LujEV0ls
YSEY1QSteDwsOoBrp+uvFRTp2InBuThs4pFsiv9kuXclVzDAGySj4dzp30d8tbQk
CAUw7C29C79Fv1C5qfPrmAESrciIxpg0X40KPMbp1ZWVbd4=
-----END CERTIFICATE-----
subject= /C=US/O=DigiCert Inc/OU=www.digicert.com/CN=DigiCert High Assurance EV Root CA
-----BEGIN CERTIFICATE-----
MIIDxTCCAq2gAwIBAgIQAqxcJmoLQJuPC3nyrkYldzANBgkqhkiG9w0BAQUFADBs
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSswKQYDVQQDEyJEaWdpQ2VydCBIaWdoIEFzc3VyYW5j
ZSBFViBSb290IENBMB4XDTA2MTExMDAwMDAwMFoXDTMxMTExMDAwMDAwMFowbDEL
MAkGA1UEBhMCVVMxFTATBgNVBAoTDERpZ2lDZXJ0IEluYzEZMBcGA1UECxMQd3d3
LmRpZ2ljZXJ0LmNvbTErMCkGA1UEAxMiRGlnaUNlcnQgSGlnaCBBc3N1cmFuY2Ug
RVYgUm9vdCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMbM5XPm
+9S75S0tMqbf5YE/yc0lSbZxKsPVlDRnogocsF9ppkCxxLeyj9CYpKlBWTrT3JTW
PNt0OKRKzE0lgvdKpVMSOO7zSW1xkX5jtqumX8OkhPhPYlG++MXs2ziS4wblCJEM
xChBVfvLWokVfnHoNb9Ncgk9vjo4UFt3MRuNs8ckRZqnrG0AFFoEt7oT61EKmEFB
Ik5lYYeBQVCmeVyJ3hlKV9Uu5l0cUyx+mM0aBhakaHPQNAQTXKFx01p8VdteZOE3
hzBWBOURtCmAEvF5OYiiAhF8J2a3iLd48soKqDirCmTCv2ZdlYTBoSUeh10aUAsg
EsxBu24LUTi4S8sCAwEAAaNjMGEwDgYDVR0PAQH/BAQDAgGGMA8GA1UdEwEB/wQF
MAMBAf8wHQYDVR0OBBYEFLE+w2kD+L9HAdSYJhoIAu9jZCvDMB8GA1UdIwQYMBaA
FLE+w2kD+L9HAdSYJhoIAu9jZCvDMA0GCSqGSIb3DQEBBQUAA4IBAQAcGgaX3Nec
nzyIZgYIVyHbIUf4KmeqvxgydkAQV8GK83rZEWWONfqe/EW1ntlMMUu4kehDLI6z
eM7b41N5cdblIZQB2lWHmiRk9opmzN6cN82oNLFpmyPInngiK3BD41VHMWEZ71jF
hS9OMPagMRYjyOfiZRYzy78aG6A9+MpeizGLYAiJLQwGXFK3xPkKmNEVX58Svnw2
Yzi9RKR/5CYrCsSXaQ3pjOLAEFe4yHYSkVXySGnYvCoCWw9E1CAx2/S6cCZdkGCe
vEsXCS+0yx5DaMkHJ8HSXPfqIbloEpw8nL+e/IBcm2PN7EeqJSdnoDfzAIJ9VNep
+OkuE6N36B9K
-----END CERTIFICATE-----
subject= /C=US/O=SecureTrust Corporation/CN=SecureTrust CA
-----BEGIN CERTIFICATE-----
MIIDuDCCAqCgAwIBAgIQDPCOXAgWpa1Cf/DrJxhZ0DANBgkqhkiG9w0BAQUFADBI
MQswCQYDVQQGEwJVUzEgMB4GA1UEChMXU2VjdXJlVHJ1c3QgQ29ycG9yYXRpb24x
FzAVBgNVBAMTDlNlY3VyZVRydXN0IENBMB4XDTA2MTEwNzE5MzExOFoXDTI5MTIz
MTE5NDA1NVowSDELMAkGA1UEBhMCVVMxIDAeBgNVBAoTF1NlY3VyZVRydXN0IENv
cnBvcmF0aW9uMRcwFQYDVQQDEw5TZWN1cmVUcnVzdCBDQTCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBAKukgeWVzfX2FI7CT8rU4niVWJxB4Q2ZQCQXOZEz
Zum+4YOvYlyJ0fwkW2Gz4BERQRwdbvC4u/jep4G6pkjGnx29vo6pQT64lO0pGtSO
0gMdA+9tDWccV9cGrcrI9f4Or2YlSASWC12juhbDCE/RRvgUXPLIXgGZbf2IzIao
wW8xQmxSPmjL8xk037uHGFaAJsTQ3MBv396gwpEWoGQRS0S8Hvbn+mPeZqx2pHGj
7DaUaHp3pLHnDi+BeuK1cobvomuL8A/b01k/unK8RCSc43Oz969XL0Imnal0ugBS
8kvNU3xHCzaFDmapCJcWNFfBZveA4+1wVMeT4C4oFVmHursCAwEAAaOBnTCBmjAT
BgkrBgEEAYI3FAIEBh4EAEMAQTALBgNVHQ8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB
/zAdBgNVHQ4EFgQUQjK2FvoE/f5dS3rD/fdMQB1aQ68wNAYDVR0fBC0wKzApoCeg
JYYjaHR0cDovL2NybC5zZWN1cmV0cnVzdC5jb20vU1RDQS5jcmwwEAYJKwYBBAGC
NxUBBAMCAQAwDQYJKoZIhvcNAQEFBQADggEBADDtT0rhWDpSclu1pqNlGKa7UTt3
6Z3q059c4EVlew3KW+JwULKUBRSuSceNQQcSc5R+DCMh/bwQf2AQWnL1mA6s7Ll/
3XpvXdMc9P+IBWlCqQVxyLesJugutIxq/3HcuLHfmbx8IVQr5Fiiu1cprp6poxkm
D5kuCLDv/WnPmRoJjeOnnyvJNjR7JLN4TJUXpAYmHrZkUjZfYGfZnMUFdAvnZyPS
CPyI6a6Lf+Ew9Dd+/cYy2i2eRDAwbO4H3tI0/NL/QPZL9GZGBlSm8jIKYyYwa5vR
3ItHuuG51WLQoqD0ZwV4KWMabwTW+MZMo5qxN7SN5ShLHZ4swrhovO0C7jE=
-----END CERTIFICATE-----
subject= /C=US/O=SecureTrust Corporation/CN=Secure Global CA
-----BEGIN CERTIFICATE-----
MIIDvDCCAqSgAwIBAgIQB1YipOjUiolN9BPI8PjqpTANBgkqhkiG9w0BAQUFADBK
MQswCQYDVQQGEwJVUzEgMB4GA1UEChMXU2VjdXJlVHJ1c3QgQ29ycG9yYXRpb24x
GTAXBgNVBAMTEFNlY3VyZSBHbG9iYWwgQ0EwHhcNMDYxMTA3MTk0MjI4WhcNMjkx
MjMxMTk1MjA2WjBKMQswCQYDVQQGEwJVUzEgMB4GA1UEChMXU2VjdXJlVHJ1c3Qg
Q29ycG9yYXRpb24xGTAXBgNVBAMTEFNlY3VyZSBHbG9iYWwgQ0EwggEiMA0GCSqG
SIb3DQEBAQUAA4IBDwAwggEKAoIBAQCvNS7YrGxVaQZx5RNoJLNP2MwhR/jxYDiJ
iQPpvepeRlMJ3Fz1Wuj3RSoC6zFh1ykzTM7HfAo3fg+6MpjhHZevj8fcyTiW89sa
/FHtaMbQbqR8JNGuQsiWUGMu4P51/pinX0kuleM5M2SOHqRfkNJnPLLZ/kG5VacJ
jnIFHovdRIWCQtBJwB1g8NEXLJXr9qXBkqPFwqcIYA1gBBCWeZ4WNOaptvolRTnI
HmX5k/Wq8VLcmZg9pYYaDDUz+kulBAYVHDGA76oYa8J719rO+TMg1fW9ajMtgQT7
sFzUnKPiXB3jqUJ1XnvUd+85VLrJChgbEplJL4hL/VBi0XPnj3pDAgMBAAGjgZ0w
gZowEwYJKwYBBAGCNxQCBAYeBABDAEEwCwYDVR0PBAQDAgGGMA8GA1UdEwEB/wQF
MAMBAf8wHQYDVR0OBBYEFK9EBMJBfkiD2045AuzshHrmzsmkMDQGA1UdHwQtMCsw
KaAnoCWGI2h0dHA6Ly9jcmwuc2VjdXJldHJ1c3QuY29tL1NHQ0EuY3JsMBAGCSsG
AQQBgjcVAQQDAgEAMA0GCSqGSIb3DQEBBQUAA4IBAQBjGghAfaReUw132HquHw0L
URYD7xh8yOOvaliTFGCRsoTciE6+OYo68+aCiV0BN7OrJKQVDpI1WkpEXk5X+nXO
H0jOZvQ8QCaSmGwb7iRGDBezUqXbpZGRzzfTb+cnCDpOGR86p1hcF895P4vkp9Mm
I50mD1hp/Ed+stCNi5O/KU9DaXR2Z0vPB4zmAve14bRDtUstFJ/53CYNv6ZHdAbY
iNE6KTCEztI5gGIbqMdXSbxqVVFnFUq+NQfk1XWYN3kwFNspnWzFacxHVaIw98xc
f8LDmBxrThaA63p4ZUWiABqvDA1VZDRIuJK58bRQKfJPIx/abKwfROHdI3hRW8cW
-----END CERTIFICATE-----
"""
### OpenVPN Access Server imports post-auth scripts into database
### blobs, so we have to include dependencies inline.
### The following code was adapted from duo_client_python.
import base64
import email.utils
import hashlib
import hmac
import json
import os
import sys
import urllib
def canon_params(params):
args = []
for key in sorted(params.keys()):
val = params[key]
arg = '%s=%s' % (urllib.quote(key, '~'), urllib.quote(val, '~'))
args.append(arg)
return '&'.join(args)
def canonicalize(method, host, uri, params, date, sig_version):
if sig_version == 1:
canon = []
elif sig_version == 2:
canon = [date]
else:
raise NotImplementedError(sig_version)
canon += [
method.upper(),
host.lower(),
uri,
canon_params(params),
]
return '\n'.join(canon)
def sign(ikey, skey, method, host, uri, date, sig_version, params):
"""
Return basic authorization header line with a Duo Web API signature.
"""
canonical = canonicalize(method, host, uri, params, date, sig_version)
if isinstance(skey, unicode):
skey = skey.encode('utf-8')
sig = hmac.new(skey, canonical, hashlib.sha1)
auth = '%s:%s' % (ikey, sig.hexdigest())
return 'Basic %s' % base64.b64encode(auth)
def encode_params(params):
"""Returns copy of params with unicode strings utf-8 encoded"""
new_params = {}
for key, value in params.items():
if isinstance(key, unicode):
key = key.encode("utf-8")
if isinstance(value, unicode):
value = value.encode("utf-8")
new_params[key] = value
return new_params
class Client(object):
sig_version = 1
def __init__(self, ikey, skey, host,
ca_certs=None,
user_agent=None):
"""
ca - Path to CA pem file.
"""
self.ikey = ikey
self.skey = skey
self.host = host
self.ca_certs = ca_certs
self.set_proxy(host=None, proxy_type=None)
self.user_agent = user_agent
def set_proxy(self, host, port=None, headers=None,
proxy_type='CONNECT'):
"""
Configure proxy for API calls. Supported proxy_type values:
'CONNECT' - HTTP proxy with CONNECT.
None - Disable proxy.
"""
if proxy_type not in ('CONNECT', None):
raise NotImplementedError('proxy_type=%s' % (proxy_type,))
self.proxy_headers = headers
self.proxy_host = host
self.proxy_port = port
self.proxy_type = proxy_type
def api_call(self, method, path, params):
"""
Call a Duo API method. Return a (status, reason, data) tuple.
"""
# urllib cannot handle unicode strings properly. quote() excepts,
# and urlencode() replaces them with '?'.
params = encode_params(params)
now = email.utils.formatdate()
auth = sign(self.ikey,
self.skey,
method,
self.host,
path,
now,
self.sig_version,
params)
headers = {
'Authorization': auth,
'Date': now,
'Host': self.host,
}
if self.user_agent:
headers['User-Agent'] = self.user_agent
if method in ['POST', 'PUT']:
headers['Content-type'] = 'application/x-www-form-urlencoded'
body = urllib.urlencode(params, doseq=True)
uri = path
else:
body = None
uri = path + '?' + urllib.urlencode(params, doseq=True)
# Host and port for the HTTP(S) connection to the API server.
if self.ca_certs == 'HTTP':
api_port = 80
api_proto = 'http'
else:
api_port = 443
api_proto = 'https'
# Host and port for outer HTTP(S) connection if proxied.
if self.proxy_type is None:
host = self.host
port = api_port
elif self.proxy_type == 'CONNECT':
host = self.proxy_host
port = self.proxy_port
else:
raise NotImplementedError('proxy_type=%s' % (self.proxy_type,))
# Create outer HTTP(S) connection.
conn = CertValidatingHTTPSConnection(host,
port,
ca_certs=self.ca_certs)
# Configure CONNECT proxy tunnel, if any.
if self.proxy_type == 'CONNECT':
# Ensure the request has the correct Host.
uri = ''.join((api_proto, '://', self.host, uri))
if hasattr(conn, 'set_tunnel'): # 2.7+
conn.set_tunnel(self.host,
api_port,
self.proxy_headers)
elif hasattr(conn, '_set_tunnel'): # 2.6.3+
# pylint: disable=E1103
conn._set_tunnel(self.host,
api_port,
self.proxy_headers)
# pylint: enable=E1103
conn.request(method, uri, body, headers)
response = conn.getresponse()
data = response.read()
conn.close()
return (response, data)
def json_api_call(self, method, path, params):
"""
Call a Duo API method which is expected to return a JSON body
with a 200 status. Return the response data structure or raise
RuntimeError.
"""
(response, data) = self.api_call(method, path, params)
if response.status != 200:
msg = 'Received %s %s' % (response.status, response.reason)
try:
data = json.loads(data)
if data['stat'] == 'FAIL':
if 'message_detail' in data:
msg = 'Received %s %s (%s)' % (
response.status,
data['message'],
data['message_detail'],
)
else:
msg = 'Received %s %s' % (
response.status,
data['message'],
)
except (ValueError, KeyError, TypeError):
pass
error = RuntimeError(msg)
error.status = response.status
error.reason = response.reason
error.data = data
raise error
try:
data = json.loads(data)
if data['stat'] != 'OK':
raise RuntimeError('Received error response: %s' % data)
return data['response']
except (ValueError, KeyError, TypeError):
raise RuntimeError('Received bad response: %s' % data)
### The following code was adapted from:
### https://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import httplib
import re
import socket
import ssl
class InvalidCertificateException(httplib.HTTPException):
"""Raised when a certificate is provided with an invalid hostname."""
def __init__(self, host, cert, reason):
"""Constructor.
Args:
host: The hostname the connection was made to.
cert: The SSL certificate (as a dictionary) the host returned.
"""
httplib.HTTPException.__init__(self)
self.host = host
self.cert = cert
self.reason = reason
def __str__(self):
return ('Host %s returned an invalid certificate (%s): %s\n'
'To learn more, see '
'http://code.google.com/appengine/kb/general.html#rpcssl' %
(self.host, self.reason, self.cert))
class CertValidatingHTTPSConnection(httplib.HTTPConnection):
"""An HTTPConnection that connects over SSL and validates certificates."""
default_port = httplib.HTTPS_PORT
def __init__(self, host, port=None, key_file=None, cert_file=None,
ca_certs=None, strict=None, **kwargs):
"""Constructor.
Args:
host: The hostname. Can be in 'host:port' form.
port: The port. Defaults to 443.
key_file: A file containing the client's private key
cert_file: A file containing the client's certificates
ca_certs: A file contianing a set of concatenated certificate authority
certs for validating the server against.
strict: When true, causes BadStatusLine to be raised if the status line
can't be parsed as a valid HTTP/1.0 or 1.1 status line.
"""
httplib.HTTPConnection.__init__(self, host, port, strict, **kwargs)
self.key_file = key_file
self.cert_file = cert_file
self.ca_certs = ca_certs
if self.ca_certs:
self.cert_reqs = ssl.CERT_REQUIRED
else:
self.cert_reqs = ssl.CERT_NONE
def _GetValidHostsForCert(self, cert):
"""Returns a list of valid host globs for an SSL certificate.
Args:
cert: A dictionary representing an SSL certificate.
Returns:
list: A list of valid host globs.
"""
if 'subjectAltName' in cert:
return [x[1] for x in cert['subjectAltName'] if x[0].lower() == 'dns']
else:
return [x[0][1] for x in cert['subject']
if x[0][0].lower() == 'commonname']
def _ValidateCertificateHostname(self, cert, hostname):
"""Validates that a given hostname is valid for an SSL certificate.
Args:
cert: A dictionary representing an SSL certificate.
hostname: The hostname to test.
Returns:
bool: Whether or not the hostname is valid for this certificate.
"""
hosts = self._GetValidHostsForCert(cert)
for host in hosts:
host_re = host.replace('.', '\.').replace('*', '[^.]*')
if re.search('^%s$' % (host_re,), hostname, re.I):
return True
return False
def connect(self):
"Connect to a host on a given (SSL) port."
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
self.sock = sock
if self._tunnel_host:
self._tunnel()
self.sock = ssl.wrap_socket(self.sock, keyfile=self.key_file,
certfile=self.cert_file,
cert_reqs=self.cert_reqs,
ca_certs=self.ca_certs)
if self.cert_reqs & ssl.CERT_REQUIRED:
cert = self.sock.getpeercert()
cert_validation_host = self._tunnel_host or self.host
hostname = cert_validation_host.split(':', 0)[0]
if not self._ValidateCertificateHostname(cert, hostname):
raise InvalidCertificateException(hostname, cert, 'hostname mismatch')
### duo_openvpn_as.py integration code:
__version__ = '2.2'
def log(msg):
msg = 'Duo OpenVPN_AS: %s' % msg
syslog.syslog(msg)
class OpenVPNIntegration(Client):
def __init__(self, *args, **kwargs):
kwargs['user_agent'] = 'duo_openvpn_as/' + __version__
super(OpenVPNIntegration, self).__init__(*args, **kwargs)
def api_call(self, *args, **kwargs):
orig_ca_certs = self.ca_certs
try:
with tempfile.NamedTemporaryFile() as fp:
fp.write(CA_CERT)
fp.flush()
self.ca_certs = fp.name
return Client.api_call(self, *args, **kwargs)
finally:
self.ca_certs = orig_ca_certs
def preauth(self, username, ipaddr):
log('pre-authentication for %s' % username)
params = {
'user': username,
'ipaddr': ipaddr,
}
response = self.json_api_call('POST', '/rest/v1/preauth', params)
result = response.get('result')
if not result:
log('invalid API response: %s' % response)
raise RuntimeError('invalid API response: %s' % response)
return
if result == API_RESULT_AUTH:
log('secondary authentication required for user %s' % username)
msg = 'Duo passcode or second factor:'
return (result, msg)
status = response.get('status')
if not status:
log('invalid API response: %s' % response)
raise RuntimeError('invalid API response: %s' % response)
msg = status
if result == API_RESULT_ENROLL:
log('user %s is not enrolled: %s' % (username, status))
elif result == API_RESULT_DENY:
log('preauth failure for %s: %s' % (username, status))
elif result == API_RESULT_ALLOW:
log('preauth success for %s: %s' % (username, status))
else:
log('unknown preauth result: %s' % result)
return (result, msg)
def auth(self, username, password, ipaddr):
log('authentication for %s' % username)
params = {
'user': username,
'factor': 'auto',
'auto': password,
'ipaddr': ipaddr
}
response = self.json_api_call('POST', '/rest/v1/auth', params)
result = response.get('result')
status = response.get('status')
if not result or not status:
log('invalid API response: %s' % response)
raise RuntimeError('invalid API response: %s' % response)
return
if result == API_RESULT_ALLOW:
log('auth success for %s: %s' % (username, status))
elif result == API_RESULT_DENY:
log('auth failure for %s: %s' % (username, status))
else:
log('unknown auth result: %s' % result)
return result, status
api = OpenVPNIntegration(IKEY, SKEY, HOST)
if PROXY_HOST:
api.set_proxy(host=PROXY_HOST, port=PROXY_PORT)
def post_auth_cr(authcred, attributes, authret, info, crstate):
# Don't do challenge/response on sessions or autologin clients.
# autologin client: a client that has been issued a special
# certificate allowing authentication with only a certificate
# (used for unattended clients such as servers).
# session: a client that has already authenticated and received
# a session token. The client is attempting to authenticate
# again using the session token.
if info.get('auth_method') in ('session', 'autologin'):
return authret
if SKIP_DUO_ON_VPN_AUTH and attributes.get('vpn_auth'):
return authret
username = authcred['username']
ipaddr = authcred['client_ip_addr']
proplist_save = {}
# initial auth request; issue challenge
try:
result, msg = api.preauth(username)
if result == API_RESULT_AUTH:
# save state indicating challenge has been issued
try:
# Hardcode "push" method
result, msg = api.auth(username, "push", ipaddr)
if result == API_RESULT_ALLOW:
authret['status'] = SUCCEED
authret['reason'] = msg
else:
authret['status'] = FAIL
authret['reason'] = msg
authret['client_reason'] = authret['reason']
except Exception as e:
log(traceback.format_exc())
authret['status'] = FAIL
authret['reason'] = "Exception caught in auth: %s" % e
authret['client_reason'] = \
"Unknown error communicating with Duo service"
elif result == API_RESULT_ENROLL:
authret['status'] = FAIL
# Attempt to detect whether the login came from a web client
# or a native client
if attributes.get('log_service_name') == 'WEB_CLIENT':
# It's pretty reasonable to copy/paste the enrollment
# link when it's displayed in a web client
authret['reason'] = msg
else:
# Native clients tend not to be in a good position to
# show an enrollment link (e.g. on windows, it shows
# up in a temporary balloon popup from the
# systray), so we'll replace it with a generic message.
authret['reason'] = ('User account has not been '
'enrolled for Duo authentication')
authret['client_reason'] = authret['reason']
elif result != API_RESULT_ALLOW:
authret['status'] = FAIL
authret['reason'] = msg
authret['client_reason'] = authret['reason']
except Exception as e:
log(traceback.format_exc())
authret['status'] = FAIL
authret['reason'] = "Exception caught in pre-auth: %s" % e
authret['client_reason'] = \
"Unknown error communicating with Duo service"
return authret, proplist_save
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment