Skip to content

Instantly share code, notes, and snippets.

@jn0
Last active January 10, 2022 07:36
Show Gist options
  • Save jn0/2c906549788ff94eef0360eb38367cdc to your computer and use it in GitHub Desktop.
Save jn0/2c906549788ff94eef0360eb38367cdc to your computer and use it in GitHub Desktop.
Updated controller for OpenVPN with TOTP static challenge support (it runs openvpn on its own)
#!/usr/bin/env python3
'''
Configure sudo(8) to run openvpn(8) without password prompt by adding
you ALL = NOPASSWD: /usr/sbin/openvpn
to your sudoers file. The script will try to verify your sudo(8) config.
Note: the command to run is
sudo openvpn --config myconfig.ovpn
where `myconfig.ovpn` is your actual config.
Start your OpenVPN config with
client
management myconfig.socket unix
management-query-passwords
lines or specify `--management` flag and leave the `client` alone.
Make sure the config has
static-challenge "Enter PIN: " 1
and
auth-user-pass
entries (`static-challenge` prompt and echo may differ, of course).
The OpenVPN is being ran with the only `--config` argument if no `--management`
flag is specified!
The script will sorta validate your config and fail if it looks wrong.
Put your name, password, and TOTP secret to a file (`myconfig.ovpn.pw` here,
just next to the config file for autopickup) and make it user-only readable
(`chmod 0600 myconfig.ovpn.pw`)! The script will check for extra perms...
Make sure you have NO `myconfig.socket` file! Any leftover is a bug.
Run this script as `ovpnctl.py myconfig.ovpn`. The config is locked preventing
multiple instances from being ran.
You may find OpenVPN's log in `myconfig.ovpn.log` (rotated sorta daily).
'''
import logging; log = logging.getLogger('OpenVPNctl' if __name__ == '__main__' else __name__) # noqa:E702,E501
import sys
import socket
from time import strftime, localtime, sleep, perf_counter
from datetime import timedelta
from os import chdir, rename, stat, access, X_OK, unlink
from os.path import exists, realpath, dirname, basename, isabs, getctime
from base64 import b64encode
from subprocess import Popen, DEVNULL, PIPE
from argparse import ArgumentParser
# one have to pip install them first
from inotify.adapters import Inotify
from pyotp import TOTP
from filelock import FileLock, Timeout as LockTimeout
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
def pin(secret): return TOTP(secret).now()
def b64(s): return b64encode(s.encode()).decode()
def stamp(xtime=None, format='%F %T', kill=''):
return strftime(format, localtime(xtime)).replace(kill, '')
def set_logging(level=logging.INFO):
logging.root.setLevel(level)
def wait_for_file(path):
dname = dirname(path)
fname = basename(path)
log.info('Waiting for %r in %r...', fname, dname)
i = Inotify()
i.add_watch(dname)
for e in i.event_gen(yield_nones=False):
# print(e) # noqa:E501 (_INOTIFY_EVENT(wd=1, mask=256, cookie=0, len=16), ['IN_CREATE'], '/tmp', 'qwe')
if fname == e[-1] and 'IN_CREATE' in e[1]:
log.info('File %r has been created', fname)
break
i.remove_watch(dname)
class Control:
'Handle AF_UNIX control socket'
def __init__(self, socket):
self.socket = socket
self._s = None
self.buffer = []
def connect(self):
log.debug('Connecting socket')
self._s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._s.connect(self.socket)
def _recv(self, n=8192):
log.debug('accessing socket')
r = self._s.recv(n)
log.debug('socket read: %r', r)
return (r or b'').decode()
def recv(self): # returns a line (or try to)
log.debug('reading socket')
r = None
if self.buffer:
if len(self.buffer) > 1:
r = self.buffer.pop(0)
else:
if self.buffer[0].endswith(('\n', '\r')):
r = self.buffer.pop(0)
if r is None:
r = self._recv()
r = r.splitlines(keepends=True) if r else []
if (not self.buffer) or self.buffer[-1].endswith(('\n', '\r')):
self.buffer += r # all lines in buffer are "complete"
else:
self.buffer[-1] += r.pop(0) # fill incomplete line
self.buffer += r
r = self.buffer.pop(0) if self.buffer else None
log.debug('socket return: %r', r)
return r
def line(self, quiet=False):
r = self.recv()
if r:
r = r.rstrip()
if not quiet:
log.info('> %s', r)
return r
def read_to_end(self, end='END'):
r = []
ok = False
while not ok:
s = self.recv()
log.info('+ %s', s)
r.append(s)
ok = ok or (s == end)
return r
def send(self, s, eol=b'\r\n'):
log.debug('writing socket %r', s)
return self._s.send(s.encode() + eol)
def close(self):
log.debug('Closing socket')
if self._s:
self._s.close()
self._s = None
def input(self, quiet=True):
while True:
line = self.line(quiet=quiet)
if line is None:
break
yield line
self.close()
class OvpnMgmtRealTimeMessagesMixin:
'''\
Protocol primitives: "real time messages".
Syntax is: '>' <tag> ':' <arg> <EOL>
'''
@staticmethod
def is_rtm(line):
return line.startswith('>') and ':' in line
@staticmethod
def get_tag_and_arg(line):
tag, arg = line[1:].split(':', 1)
return tag.replace('-', '_'), arg.strip()
def BYTECOUNT(self, text):
'''>BYTECOUNT:{BYTES_IN},{BYTES_OUT}'''
bin, bout = text.split(',')
log.info('BYTECOUNT: in=%s out=%s', bin, bout)
def BYTECOUNT_CLI(self, text):
'''>BYTECOUNT_CLI:{CID},{BYTES_IN},{BYTES_OUT}'''
cid, bin, bout = text.split(',')
log.info('BYTECOUNT_CLI: cid=%s in=%s out=%s', cid, bin, bout)
def CLIENT(self, text):
log.info('CLIENT: %s', text)
if not text.startswith('ADDRESS,'):
self.ctl.read_to_end('>CLIENT:ENV,END')
def ECHO(self, text):
'''>ECHO:{xz1},{xz2}'''
log.info('ECHO: %s', text)
def FATAL(self, text):
log.info('FATAL: %s', text)
def HOLD(self, text):
'''>HOLD:Waiting for hold release:10'''
log.info('HOLD: %s', text)
def INFO(self, text):
log.info('INFO: %s', text)
def LOG(self, text):
'''>LOG:{time},{flags},{text}'''
utime, flags, text = text.split(',', 2)
timev = stamp(int(utime))
fdecoded = [{'I': 'informational',
'F': 'fatal error',
'N': 'non-fatal error',
'W': 'warning',
'D': 'debug'}.get(c, 'unknown %r' % c) for c in flags]
log.info('LOG: time=%r flags=%r: %s', timev, fdecoded, text)
def NEED_OK(self, text):
'''>NEED-OK:Need 'token-insertion-request' confirmation MSG:Please insert your cryptographic token''' # noqa
need, msg = text.split('MSG:', 1)
tag = need.split("'" if "'" in need else '"', 2)[1]
log.info('NEED-OK: [%s] %s // %s', tag, need, msg)
self.ctl.send('needok %s %s' % (tag, 'cancel')) # cancel every request
def NEED_STR(self, text):
'''>NEED-STR:Need 'name' input MSG:Please specify your name'''
need, msg = text.split('MSG:', 1)
tag = need.split("'" if "'" in need else '"', 2)[1]
log.info('NEED-STR: [%s] %s // %s', tag, need, msg)
self.ctl.send('needstr %s %s' % (tag, tag))
def PASSWORD(self, text):
log.debug('PASSWORD(%r)', text)
if text.startswith("Need 'Auth' username/password SC:"):
text = text.split(':', 1)[-1].strip()
self.run_static_challenge(text)
elif text.startswith('Auth-Token:'):
text = text.split(':', 1)[-1].strip()
self.auth_token = text
log.info('Auth token: %s', text)
elif text.startswith("Verification Failed:"):
text = text.split(':', 1)[-1].strip()
log.info('Verification failed: %s', text)
if text in ("'Auth'", "'Private Key'"):
log.fatal('Oops.')
else:
log.error('Unhandled %r', text)
else:
log.warning('Unhandled PASSWORD entry %r', text)
def STATE(self, text):
log.info('STATE: %s', text)
def INFOMSG(self, text):
log.info('INFOMSG: %s', text)
class OvpnMgmt(OvpnMgmtRealTimeMessagesMixin):
'https://github.com/OpenVPN/openvpn/blob/42f6063f611b75a07c266061dc6a07f6dcfe372c/doc/management-notes.txt' # noqa
def __init__(self, socket, upfile):
self.socket = socket
self.upfile = upfile
self.ctl = Control(socket)
def load_up(self):
with open(self.upfile, 'rt') as f:
return f.read().strip().split()
def send_user(self, user):
self.ctl.send('username "Auth" %s' % (user,))
repl = self.ctl.line(quiet=True)
log.info('+ %s', repl)
return repl
def send_pass(self, pasw, otp=None):
self.ctl.send('password "Auth" "SCRV1:%s:%s"' %
(b64(pasw), b64(otp) if otp else ''))
repl = self.ctl.line(quiet=True)
log.info('+ %s', repl)
return repl
def send_cr_response(self, otp):
self.ctl.send('cr-response %s' % (b64(otp),))
repl = self.ctl.line(quiet=True)
log.info('+ %s', repl)
return repl
def send_status(self):
self.ctl.send('status')
return self.ctl.read_to_end()
def send_state(self):
self.ctl.send('state')
return self.ctl.read_to_end()
def send_user_pass(self, otp=None, echo=None):
user, pasw, secret = self.load_up()
self.send_user(user)
if otp is True:
otp = pin(secret)
if echo:
print(echo, otp, file=sys.stderr)
self.send_pass(pasw, otp)
def run_static_challenge(self, line): # "1,Please enter token PIN"
e, t = line.split(',', 1)
self.send_user_pass(otp=True, echo=t if e == '1' else None)
def run(self): # the only "public" method
try:
self.ctl.connect()
for line in self.ctl.input():
if self.is_rtm(line):
tag, arg = self.get_tag_and_arg(line)
log.debug('tag=%r', tag)
if hasattr(self, tag): # dispatch to a method
getattr(self, tag)(arg)
else:
log.warning('Unsupported %r in %r', tag, line)
else:
log.warning('Unknown %r', line)
log.info('OpenVPN has shut down - exiting')
return 0
except IOError as e:
log.error('Error %s on %r', e, self.socket, exc_info=1)
return 1
finally:
self.ctl.close()
def get_socket(config, default):
socket = None
with open(config, 'rt') as f:
is_client = mqp = aup = scr = False
ln = 0
for line in f:
ln += 1
line = line.split('#', 1)[0].strip() # quite simplified parser...
if not line:
continue
if line == 'client':
is_client = True
continue
_ = line.split()
opt, arg = _[0], _[1:]
if opt == 'management':
if len(arg) == 2 and arg[1] == 'unix':
socket = arg[0]
mqp = mqp or opt == 'management-query-passwords'
aup = aup or opt == 'auth-user-pass'
scr = scr or opt == 'static-challenge'
if not is_client:
log.error('Config %r is not for client', config)
if default:
if mqp:
log.warning('Config %r has management-query-passwords entry'
' and it is enforced', config)
else:
mqp = True
log.info('Config %r has no management-query-passwords entry'
' but it is implied', config)
if socket:
log.warning('Config %r has management entry for'
' unix domain socket %r'
' but it will be overridden with %r',
config, socket, default)
else:
log.info('Config %r has no management entry for'
' unix domain socket, using %r', config, default)
socket = default
else:
if not mqp:
log.error('Config %r has no management-query-passwords entry',
config)
if not socket:
log.error('Config %r has no management entry for'
' unix domain socket', config)
if not aup:
log.error('Config %r has no auth-user-pass entry'
" -- I ain't sure I can authorize you!", config)
if not scr:
log.error('Config %r has no static-challenge entry'
' -- you do not have to use this tool!', config)
if not (is_client and mqp and aup and scr and socket):
log.fatal('Bad config %r', config)
sys.exit(1)
log.info('Using socket file %r, config of %d lines looks good to me.',
socket, ln)
return socket
def logrotate(logname):
if exists(logname):
today = stamp(format='%F', kill='-')
ctime = stamp(getctime(logname), format='%F', kill='-')
log.info('Log file %r of %r while now is %r', logname, ctime, today)
if today > ctime:
log.info('Rotating log to %r', logname + '.' + ctime)
rename(logname, logname + '.' + ctime)
else:
log.warning('No log file %r found', logname)
return logname
def sudo(*av, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL):
return Popen(['sudo'] + list(av),
stdin=stdin, stdout=stdout, stderr=stderr)
def start_openvpn(config, logname, mgmt, more):
log.info('Starting OpenVPN...')
f = open(logname, 'a')
cmd = ['openvpn', '--config', config]
if mgmt:
cmd += ['--management', mgmt, 'unix',
'--management-query-passwords']
if more:
for a in more:
a = a.split()
a[0] = '--' + a[0]
cmd += a
log.debug('start_openvpn: %r', cmd)
return sudo(*cmd, stdout=f, stderr=f)
def check_sudo_openvpn():
sp = sudo('--list', 'openvpn')
out, _ = sp.communicate()
rc = sp.wait()
if rc:
log.error('sudo(8) does not allow openvpn for you')
return None
openvpn = out.decode().splitlines()[0].strip()
if not access(openvpn, X_OK):
log.error('sudo(8) misconfigured for OpenVPN as %r', openvpn)
return None
sp = sudo('--list')
out, _ = sp.communicate()
rc = sp.wait()
if rc: # list may be disabled...
log.warning('Cannot verify sudo(8) config for %r', openvpn)
else:
ok = False
for line in out.decode().splitlines():
# (root) NOPASSWD: /usr/sbin/openvpn
if openvpn in line and 'root' in line and 'NOPASSWD:' in line:
ok = True # quite primitive check...
break
if not ok:
log.error('sudo(8) does not allow passwordless openvpn for you')
return None
return openvpn
class ARGS(ArgumentParser):
def add(self, *av, **kw):
self.add_argument(*av, **kw)
return self
def parse(self):
arg = self.parse_args()
if not arg.log:
arg.log = arg.config + '.ctl.log'
if not arg.ovpnlog:
arg.ovpnlog = arg.config + '.log'
if not arg.upfile:
arg.upfile = arg.config + '.pw'
if not arg.lock:
arg.lock = arg.config + '.lock'
ff = logging.Formatter(arg.format or LOG_FORMAT)
fh = logging.FileHandler(logrotate(arg.log))
fh.setFormatter(ff)
log.addHandler(fh)
log.fatal('-' * 80)
return arg
def get_args():
return ARGS(
description='OpenVPN Controller',
fromfile_prefix_chars='@',
).add('config', help='OpenVPN config file'
).add('--upfile', help='OpenVPN up file, other than <config>.pw' # noqa:E124,E501
).add('--lock', help='Lock file for this controller, other than <config>.lock' # noqa:E124,E501
).add('--log', help='Log file for this controller, other than <config>.ctl.log' # noqa:E124,E501
).add('--format', help='Log file format, other than ' + repr(LOG_FORMAT).replace('%', '%%') # noqa:E124,E501
).add('--ovpnlog', help='Log file for openvpn(8), other than <config>.log' # noqa:E124,E501
).add('--management', help='Force using this socket to control OpenVPN' # noqa:E124,E501
).add('--add', help='Add more OpenVPN options', action='append', default=[] # noqa:E124,E501
).add('--debug', help='Use DEBUG log level', action='store_true', default=False # noqa:E124,E501
).parse() # noqa:E124,E501
def main():
arg = get_args()
if arg.debug:
set_logging(logging.DEBUG)
if not exists(arg.config):
log.fatal('No OpenVPN config %r exists', arg.config)
return 1
if not exists(arg.upfile):
log.fatal('No credential file %r exists', arg.upfile)
return 1
mode = stat(arg.upfile)
if mode.st_mode & 0o077:
log.fatal('Your credential file %r has mode of 0%o'
' and looks compromized (use `chmod 0%o %r` to fix)',
arg.upfile, mode.st_mode, mode.st_mode & ~0o077, arg.upfile)
return 1
socket = get_socket(arg.config, arg.management) # config validation inside
openvpn = check_sudo_openvpn()
if not openvpn:
log.fatal('Your sudo(8) is not configured to run OpenVPN')
return 1
log.info('OpenVPN binary for sudo(8): %r (looks ok and configured)',
openvpn)
if not isabs(socket):
dname = dirname(realpath(arg.config))
log.info('Changind directory to %r', dname)
chdir(dname)
socket = realpath(socket)
if exists(socket):
try:
unlink(socket)
except IOError as e:
log.error('Cannot remove stale socket %r: %s', socket, e)
RC = 0
try:
with FileLock(arg.lock, timeout=0.1):
log.info('Lock aquired')
sp = start_openvpn(arg.config,
logrotate(arg.ovpnlog),
arg.management,
arg.add)
log.info('OpenVPN is running PID %r', sp.pid)
if not exists(socket):
wait_for_file(socket)
sleep(0.1)
try:
RC = OvpnMgmt(socket, arg.upfile).run()
except KeyboardInterrupt:
log.fatal('Interrupted')
RC = 0
rc = sp.wait()
log.info('OpenVPN exited with %r', rc)
log.info('Lock released')
unlink(arg.lock)
except LockTimeout:
log.fatal('Locked out')
return 1
return RC
class Timed:
def __init__(self, logger=None):
self.logger = logger or None
def __enter__(self):
self.t1 = perf_counter()
(self.logger or log).info('start')
return self
def __exit__(self, exc_type, exc_value, traceback):
self.t2 = perf_counter()
(self.logger or log).info('stop (%s elapsed)%s',
timedelta(seconds=self.t2 - self.t1),
f' crashed {exc_type}'
if exc_value and exc_type is not SystemExit
else '')
if exc_value:
raise exc_value
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, datefmt='%F %T', format=LOG_FORMAT)
with Timed():
sys.exit(main())
# vim:set ft=python ai et ts=4 sts=4 sw=4 cc=80:EOF #
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment