Last active
August 29, 2015 13:56
-
-
Save fliphess/9123388 to your computer and use it in GitHub Desktop.
openvpn htaccess authenthicator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Thanks to https://github.com/tomekwojcik/flask-htauth/blob/master/flask_htauth/htpasswd.py | |
import argparse | |
import os | |
import sys | |
import base64 | |
import codecs | |
import crypt | |
from hashlib import md5, sha1 | |
import logging | |
import logging.handlers | |
def logger(): | |
log = logging.getLogger(name='openvpn-htaccess-authenthicator') | |
log.setLevel(level=logging.INFO) | |
fh = logging.FileHandler('/var/log/openvpn/openvpn-auth.log') | |
fh.setLevel(logging.INFO) | |
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)-7s %(message)s") | |
fh.setFormatter(formatter) | |
log.addHandler(fh) | |
return log | |
def parse_options(): | |
parser = argparse.ArgumentParser(description=""" | |
Tue allmighty htaccess vpn authenthicator | |
Verify's the openpn's connecting user's password for validity. | |
openvpn authenthicator - Flip Hess 2014 - <flip@fliphess.com> | |
""") | |
parser.add_argument('-f', '--file', help="The authfile with users", type=str, required=True) | |
arguments = vars(parser.parse_args()) | |
return arguments | |
# From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/325204 | |
def apache_md5crypt(password, salt, magic='$apr1$'): | |
""" | |
Calculates the Apache-style MD5 hash of a password | |
""" | |
password = password.encode('utf-8') | |
salt = salt.encode('utf-8') | |
magic = magic.encode('utf-8') | |
m = md5() | |
m.update(password + magic + salt) | |
mixin = md5(password + salt + password).digest() | |
for i in range(0, len(password)): | |
m.update(mixin[i % 16]) | |
i = len(password) | |
while i: | |
if i & 1: | |
m.update('\x00') | |
else: | |
m.update(password[0]) | |
i >>= 1 | |
final = m.digest() | |
for i in range(1000): | |
m2 = md5() | |
if i & 1: | |
m2.update(password) | |
else: | |
m2.update(final) | |
if i % 3: | |
m2.update(salt) | |
if i % 7: | |
m2.update(password) | |
if i & 1: | |
m2.update(final) | |
else: | |
m2.update(password) | |
final = m2.digest() | |
itoa64 = './0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' | |
rearranged = '' | |
seq = ((0, 6, 12), (1, 7, 13), (2, 8, 14), (3, 9, 15), (4, 10, 5)) | |
for a, b, c in seq: | |
v = ord(final[a]) << 16 | ord(final[b]) << 8 | ord(final[c]) | |
for i in range(4): | |
rearranged += itoa64[v & 0x3f] | |
v >>= 6 | |
v = ord(final[11]) | |
for i in range(2): | |
rearranged += itoa64[v & 0x3f] | |
v >>= 6 | |
return magic + salt + '$' + rearranged | |
def check_password(password, pw_hash): | |
if pw_hash.startswith('$apr1$'): | |
salt = pw_hash.strip('$').split('$')[1] | |
check = apache_md5crypt(password, salt) | |
elif pw_hash.startswith('{SHA}'): | |
_hash = sha1(password.encode('utf-8')).digest() | |
check = '{SHA}' + base64.b64encode(_hash) | |
else: | |
check = crypt.crypt(password.encode('utf-8'), pw_hash) | |
return check == pw_hash | |
def read_htpasswd(filename): | |
with codecs.open(filename, 'r', 'utf-8') as f: | |
users = {} | |
for line in f.readlines(): | |
line = line.strip() | |
if not line or line.startswith('#'): | |
continue | |
try: | |
username, password = line.split(':', 1) | |
except ValueError: | |
continue | |
users[username] = password | |
return users | |
def main(): | |
log = logger() | |
args = parse_options() | |
username = os.environ.get('username', None) | |
password = os.environ.get('password', None) | |
ip = os.environ.get('untrusted_ip', 'unknown') | |
if not username or not password: | |
log.info('Failed login attempt for user: %s ip: %s - Setup Error: broken environment' % (username, ip)) | |
sys.exit(1) | |
try: | |
users = read_htpasswd(args['file']) | |
except Exception as e: | |
log.warning('Failed to read passwd file! Error was: %s' % e) | |
log.info('Failed login attempt for user: %s ip: %s - Canceled by exception' % (username, ip)) | |
sys.exit(1) | |
if not username in users: | |
log.info('Failed login attempt for user: %s ip: %s - Unknown user' % (username, ip)) | |
sys.exit(1) | |
pw_hash = users[username] | |
try: | |
if check_password(password=password, pw_hash=pw_hash): | |
log.info('Successfull login for user: %s ip: %s' % (username, ip)) | |
sys.exit(0) | |
else: | |
log.info('Failed login attempt for user: %s ip: %s - incorrect password' % (username, ip)) | |
sys.exit(1) | |
except Exception as e: | |
log.warning('Failed to verify user credentials! Error was: %s' % e) | |
log.info('Failed login attempt for user: %s ip: %s - Canceled by crypto exception' % (username, ip)) | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment