Created
May 18, 2022 09:24
-
-
Save schwabe/2d412ae9236888b398063317ed6a9be4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python3 | |
import sys | |
import random | |
import os | |
from base64 import standard_b64decode | |
import time | |
import pyotp | |
import pprint | |
""" | |
apt install openvpn | |
git clone https://github.com/schwabe/openvpn | |
sudo apt install libssl-dev liblz4-dev libcmocka-dev liblzo2-dev pkg-config libsystemd-dev automake autoconf libtool build-essential | |
cd openvpn | |
autoreconf -vfi | |
./configure --disable-plugin-auth-pam --enable-systemd | |
follow https://github.com/schwabe/openvpn/blob/dco/doc/man-sections/example-fingerprint.rst | |
apt install python3-pyotp | |
make && cp src/openvpn/openvpn /usr/sbin/openvpn | |
add to server.conf | |
client-crresponse /etc/openvpn/testauth.py | |
auth-user-pass-verify /etc/openvpn/testauth.py via-file | |
Usernames: | |
if a username is prefixed with url it uses the old OPEN_URL method (e.g. urlrick) | |
- fail: sends AUTH_FAILED | |
- failtemp: sends AUTH_FAILED,TEMP:Go away for a while | |
- failtemp42: sends AUTH_FAILED,TEMP and backoff for 42s | |
- failtempnr: sends AUTH_FAILED,TEMP and instructs to go the next remote | |
- success: sends AUTH_SUCCESS | |
- faildelay: sends AUTH_FAILED after 20s | |
- successdelay: sends AUTH_SUCCESS after 20s | |
- rick: sends you to a certain youtube site and auth you | |
- openvpn: sends you to openvpn.com | |
- totp: Does crtext based 2FA with secret "OS6JDNRK2BNUPQVX" | |
- openvpnhidden: send you to https://openvpn.com/?OVPN_WEBVIEW_HIDDEN=1 | |
- googlehidden: sends you to https://www.google.com/search?q=hidden&OVPN_WEBVIEW_HIDDEN=1 (OPEN_URL) or uses hidden flags with WEBAUTH | |
- google: sends you to https://www.google.com/search?q=hidden without any flags | |
- googlext: sends you to https://www.google.com/search?q=external with external flags and WEBAUTH | |
""" | |
def main(): | |
# Get common name and script type from environment | |
script_type = os.environ['script_type'] | |
cn = os.environ['common_name'] | |
with open(f"/tmp/crauth-{script_type}.txt", 'w') as out: | |
out.write(pprint.pformat(dict(os.environ))) | |
#.write(f"\nCN: {cn}\n") | |
if script_type == 'user-pass-verify': | |
# 0 == ok, 1 == fail, 2 deferred | |
# signal text based challenge response | |
pwfile = open(sys.argv[1]) | |
username = pwfile.readline()[:-1] | |
password = pwfile.readline()[:-1] | |
# sucess | |
ret = 0 | |
retdef = 0 | |
if username.startswith("url"): | |
username = username[3:] | |
method = "openurl" | |
webauth = "OPEN_URL:" | |
else: | |
method = "webauth" | |
webauth = "WEB_AUTH::" | |
if username == 'fail' or 'UV_FAIL' in os.environ: | |
ret = 1 | |
elif username == 'failtemp': | |
write_auth_failed_reason("TEMP:Go away for a while") | |
ret = 1 | |
elif username == 'failtemp42': | |
write_auth_failed_reason("TEMP[backoff 42]:Go away for a 42s") | |
ret = 1 | |
elif username == 'failtempnr': | |
write_auth_failed_reason("TEMP[backoff 11,advance remote]:Annoy the next server!") | |
ret = 1 | |
elif username == 'success': | |
ret = 0 | |
elif username == 'faildelay': | |
ret = 2 | |
retdef = 0 | |
write_auth_delayed(0, 20) | |
elif username == 'successdelay': | |
ret = 2 | |
write_auth_delayed(1, 20) | |
elif username.startswith('totp'): | |
extra = f"CR_TEXT:E,R:Please enter your TOTP code, {username}!" | |
write_auth_pending(90, 'crtext', extra) | |
ret = 2 | |
elif username == 'rick': | |
ret = 2 | |
write_auth_pending(180, method, webauth + "https://www.youtube.com/watch?v=dQw4w9WgXcQ") | |
write_auth_delayed(1, 1) | |
elif username == 'openvpn': | |
ret = 2 | |
write_auth_pending(180, method, webauth + "https://openvpn.com") | |
write_auth_delayed(1, 30) | |
elif username == 'googlehidden': | |
ret = 2 | |
if method == "webauth": | |
write_auth_pending(180, method, "WEB_AUTH:hidden:https://www.google.com/search?q=hidden") | |
else: | |
write_auth_pending(180, method, "OPEN_URL:https://www.google.com/search?q=hidden&OVPN_WEBVIEW_HIDDEN=1") | |
write_auth_delayed(0, 30) | |
elif username == 'googleext': | |
ret = 2 | |
write_auth_pending(180, method, "WEB_AUTH:external:https://www.google.com/search?q=external") | |
write_auth_delayed(0, 30) | |
elif username == 'google': | |
ret = 2 | |
write_auth_pending(180, method, "WEB_AUTH::https://www.google.com/search?q=external") | |
write_auth_delayed(0, 30) | |
elif username == 'openvpnhidden': | |
ret = 2 | |
if method == "webauth": | |
write_auth_pending(180, method, "OPEN_URL:https://openvpn.com?OVPN_WEBVIEW_HIDDEN=1") | |
else: | |
write_auth_pending(180, method, "WEB_AUTH:hidden:https://openvpn.com?OVPN_WEBVIEW_HIDDEN=1") | |
write_auth_delayed(1, 30) | |
elif username == 'special': | |
extra = f"CR_TEXT:E:This special username is known, doing nothing, really" | |
write_auth_pending(90, 'crtext', extra) | |
ret = 2 | |
write_auth_delayed(0, 7) | |
else: | |
ret =1 | |
write_auth_failed_reason("This special username is known, doing nothing, really") | |
sys.exit(ret) | |
elif script_type == 'client-crresponse': | |
response = None | |
# Read the crresponse from the argument file | |
# and convert it into text. A failure because of bad user | |
# input (e.g. invalid base64) will make the script throw | |
# an error and make OpenVPN return AUTH_FAILED | |
with open(sys.argv[1], 'r') as crinput: | |
response = crinput.read() | |
response = standard_b64decode(response) | |
response = response.decode().strip() | |
secret = "OS6JDNRK2BNUPQVX" | |
totp = pyotp.TOTP(secret) | |
with open(f"/tmp/crauth-{script_type}.txt", 'a') as out: | |
out.write(f"\nCR: expected totp '{totp.now()}', response was '{response}'\n") | |
# Check if the code is valid (and also allow code +/-1) | |
if totp.verify(response, valid_window=1): | |
write_auth_control(1) | |
else: | |
write_auth_control(0) | |
else: | |
print(f"Unknown script type {script_type}") | |
sys.exit(1) | |
def write_auth_delayed(status, delay): | |
if os.fork() == 0: | |
# Child | |
time.sleep(delay) | |
write_auth_control(status) | |
def write_auth_control(status): | |
with open(os.environ['auth_control_file'], 'w') as auth_control: | |
auth_control.write("%d" % status) | |
def write_auth_pending(timeout, method, extra): | |
with open(os.environ['auth_pending_file'], 'w') as auth_pending: | |
auth_pending.write("%d\n%s\n%s" % (timeout, method, extra)) | |
def write_auth_failed_reason(reason): | |
with open(os.environ['auth_failed_reason_file'], 'w') as auth_failed: | |
auth_failed.write("%s" % reason) | |
if __name__=='__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment