Skip to content

Instantly share code, notes, and snippets.

Last active Jan 10, 2022
What would you like to do?
Updated controller for OpenVPN with TOTP static challenge support (it runs openvpn on its own)
#!/usr/bin/env python3
Configure sudo(8) to run openvpn(8) without password prompt by adding
you ALL = NOPASSWD: /usr/sbin/openvpn
to your sudoers file. The script will try to verify your sudo(8) config.
Note: the command to run is
sudo openvpn --config myconfig.ovpn
where `myconfig.ovpn` is your actual config.
Start your OpenVPN config with
management myconfig.socket unix
lines or specify `--management` flag and leave the `client` alone.
Make sure the config has
static-challenge "Enter PIN: " 1
entries (`static-challenge` prompt and echo may differ, of course).
The OpenVPN is being ran with the only `--config` argument if no `--management`
flag is specified!
The script will sorta validate your config and fail if it looks wrong.
Put your name, password, and TOTP secret to a file (`` here,
just next to the config file for autopickup) and make it user-only readable
(`chmod 0600`)! The script will check for extra perms...
Make sure you have NO `myconfig.socket` file! Any leftover is a bug.
Run this script as ` myconfig.ovpn`. The config is locked preventing
multiple instances from being ran.
You may find OpenVPN's log in `myconfig.ovpn.log` (rotated sorta daily).
import logging; log = logging.getLogger('OpenVPNctl' if __name__ == '__main__' else __name__) # noqa:E702,E501
import sys
import socket
from time import strftime, localtime, sleep, perf_counter
from datetime import timedelta
from os import chdir, rename, stat, access, X_OK, unlink
from os.path import exists, realpath, dirname, basename, isabs, getctime
from base64 import b64encode
from subprocess import Popen, DEVNULL, PIPE
from argparse import ArgumentParser
# one have to pip install them first
from inotify.adapters import Inotify
from pyotp import TOTP
from filelock import FileLock, Timeout as LockTimeout
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
def pin(secret): return TOTP(secret).now()
def b64(s): return b64encode(s.encode()).decode()
def stamp(xtime=None, format='%F %T', kill=''):
return strftime(format, localtime(xtime)).replace(kill, '')
def set_logging(level=logging.INFO):
def wait_for_file(path):
dname = dirname(path)
fname = basename(path)'Waiting for %r in %r...', fname, dname)
i = Inotify()
for e in i.event_gen(yield_nones=False):
# print(e) # noqa:E501 (_INOTIFY_EVENT(wd=1, mask=256, cookie=0, len=16), ['IN_CREATE'], '/tmp', 'qwe')
if fname == e[-1] and 'IN_CREATE' in e[1]:'File %r has been created', fname)
class Control:
'Handle AF_UNIX control socket'
def __init__(self, socket):
self.socket = socket
self._s = None
self.buffer = []
def connect(self):
log.debug('Connecting socket')
self._s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def _recv(self, n=8192):
log.debug('accessing socket')
r = self._s.recv(n)
log.debug('socket read: %r', r)
return (r or b'').decode()
def recv(self): # returns a line (or try to)
log.debug('reading socket')
r = None
if self.buffer:
if len(self.buffer) > 1:
r = self.buffer.pop(0)
if self.buffer[0].endswith(('\n', '\r')):
r = self.buffer.pop(0)
if r is None:
r = self._recv()
r = r.splitlines(keepends=True) if r else []
if (not self.buffer) or self.buffer[-1].endswith(('\n', '\r')):
self.buffer += r # all lines in buffer are "complete"
self.buffer[-1] += r.pop(0) # fill incomplete line
self.buffer += r
r = self.buffer.pop(0) if self.buffer else None
log.debug('socket return: %r', r)
return r
def line(self, quiet=False):
r = self.recv()
if r:
r = r.rstrip()
if not quiet:'> %s', r)
return r
def read_to_end(self, end='END'):
r = []
ok = False
while not ok:
s = self.recv()'+ %s', s)
ok = ok or (s == end)
return r
def send(self, s, eol=b'\r\n'):
log.debug('writing socket %r', s)
return self._s.send(s.encode() + eol)
def close(self):
log.debug('Closing socket')
if self._s:
self._s = None
def input(self, quiet=True):
while True:
line = self.line(quiet=quiet)
if line is None:
yield line
class OvpnMgmtRealTimeMessagesMixin:
Protocol primitives: "real time messages".
Syntax is: '>' <tag> ':' <arg> <EOL>
def is_rtm(line):
return line.startswith('>') and ':' in line
def get_tag_and_arg(line):
tag, arg = line[1:].split(':', 1)
return tag.replace('-', '_'), arg.strip()
def BYTECOUNT(self, text):
bin, bout = text.split(',')'BYTECOUNT: in=%s out=%s', bin, bout)
def BYTECOUNT_CLI(self, text):
cid, bin, bout = text.split(',')'BYTECOUNT_CLI: cid=%s in=%s out=%s', cid, bin, bout)
def CLIENT(self, text):'CLIENT: %s', text)
if not text.startswith('ADDRESS,'):
def ECHO(self, text):
'''>ECHO:{xz1},{xz2}''''ECHO: %s', text)
def FATAL(self, text):'FATAL: %s', text)
def HOLD(self, text):
'''>HOLD:Waiting for hold release:10''''HOLD: %s', text)
def INFO(self, text):'INFO: %s', text)
def LOG(self, text):
utime, flags, text = text.split(',', 2)
timev = stamp(int(utime))
fdecoded = [{'I': 'informational',
'F': 'fatal error',
'N': 'non-fatal error',
'W': 'warning',
'D': 'debug'}.get(c, 'unknown %r' % c) for c in flags]'LOG: time=%r flags=%r: %s', timev, fdecoded, text)
def NEED_OK(self, text):
'''>NEED-OK:Need 'token-insertion-request' confirmation MSG:Please insert your cryptographic token''' # noqa
need, msg = text.split('MSG:', 1)
tag = need.split("'" if "'" in need else '"', 2)[1]'NEED-OK: [%s] %s // %s', tag, need, msg)
self.ctl.send('needok %s %s' % (tag, 'cancel')) # cancel every request
def NEED_STR(self, text):
'''>NEED-STR:Need 'name' input MSG:Please specify your name'''
need, msg = text.split('MSG:', 1)
tag = need.split("'" if "'" in need else '"', 2)[1]'NEED-STR: [%s] %s // %s', tag, need, msg)
self.ctl.send('needstr %s %s' % (tag, tag))
def PASSWORD(self, text):
log.debug('PASSWORD(%r)', text)
if text.startswith("Need 'Auth' username/password SC:"):
text = text.split(':', 1)[-1].strip()
elif text.startswith('Auth-Token:'):
text = text.split(':', 1)[-1].strip()
self.auth_token = text'Auth token: %s', text)
elif text.startswith("Verification Failed:"):
text = text.split(':', 1)[-1].strip()'Verification failed: %s', text)
if text in ("'Auth'", "'Private Key'"):
log.error('Unhandled %r', text)
log.warning('Unhandled PASSWORD entry %r', text)
def STATE(self, text):'STATE: %s', text)
def INFOMSG(self, text):'INFOMSG: %s', text)
class OvpnMgmt(OvpnMgmtRealTimeMessagesMixin):
'' # noqa
def __init__(self, socket, upfile):
self.socket = socket
self.upfile = upfile
self.ctl = Control(socket)
def load_up(self):
with open(self.upfile, 'rt') as f:
def send_user(self, user):
self.ctl.send('username "Auth" %s' % (user,))
repl = self.ctl.line(quiet=True)'+ %s', repl)
return repl
def send_pass(self, pasw, otp=None):
self.ctl.send('password "Auth" "SCRV1:%s:%s"' %
(b64(pasw), b64(otp) if otp else ''))
repl = self.ctl.line(quiet=True)'+ %s', repl)
return repl
def send_cr_response(self, otp):
self.ctl.send('cr-response %s' % (b64(otp),))
repl = self.ctl.line(quiet=True)'+ %s', repl)
return repl
def send_status(self):
return self.ctl.read_to_end()
def send_state(self):
return self.ctl.read_to_end()
def send_user_pass(self, otp=None, echo=None):
user, pasw, secret = self.load_up()
if otp is True:
otp = pin(secret)
if echo:
print(echo, otp, file=sys.stderr)
self.send_pass(pasw, otp)
def run_static_challenge(self, line): # "1,Please enter token PIN"
e, t = line.split(',', 1)
self.send_user_pass(otp=True, echo=t if e == '1' else None)
def run(self): # the only "public" method
for line in self.ctl.input():
if self.is_rtm(line):
tag, arg = self.get_tag_and_arg(line)
log.debug('tag=%r', tag)
if hasattr(self, tag): # dispatch to a method
getattr(self, tag)(arg)
log.warning('Unsupported %r in %r', tag, line)
log.warning('Unknown %r', line)'OpenVPN has shut down - exiting')
return 0
except IOError as e:
log.error('Error %s on %r', e, self.socket, exc_info=1)
return 1
def get_socket(config, default):
socket = None
with open(config, 'rt') as f:
is_client = mqp = aup = scr = False
ln = 0
for line in f:
ln += 1
line = line.split('#', 1)[0].strip() # quite simplified parser...
if not line:
if line == 'client':
is_client = True
_ = line.split()
opt, arg = _[0], _[1:]
if opt == 'management':
if len(arg) == 2 and arg[1] == 'unix':
socket = arg[0]
mqp = mqp or opt == 'management-query-passwords'
aup = aup or opt == 'auth-user-pass'
scr = scr or opt == 'static-challenge'
if not is_client:
log.error('Config %r is not for client', config)
if default:
if mqp:
log.warning('Config %r has management-query-passwords entry'
' and it is enforced', config)
mqp = True'Config %r has no management-query-passwords entry'
' but it is implied', config)
if socket:
log.warning('Config %r has management entry for'
' unix domain socket %r'
' but it will be overridden with %r',
config, socket, default)
else:'Config %r has no management entry for'
' unix domain socket, using %r', config, default)
socket = default
if not mqp:
log.error('Config %r has no management-query-passwords entry',
if not socket:
log.error('Config %r has no management entry for'
' unix domain socket', config)
if not aup:
log.error('Config %r has no auth-user-pass entry'
" -- I ain't sure I can authorize you!", config)
if not scr:
log.error('Config %r has no static-challenge entry'
' -- you do not have to use this tool!', config)
if not (is_client and mqp and aup and scr and socket):
log.fatal('Bad config %r', config)
sys.exit(1)'Using socket file %r, config of %d lines looks good to me.',
socket, ln)
return socket
def logrotate(logname):
if exists(logname):
today = stamp(format='%F', kill='-')
ctime = stamp(getctime(logname), format='%F', kill='-')'Log file %r of %r while now is %r', logname, ctime, today)
if today > ctime:'Rotating log to %r', logname + '.' + ctime)
rename(logname, logname + '.' + ctime)
log.warning('No log file %r found', logname)
return logname
def sudo(*av, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL):
return Popen(['sudo'] + list(av),
stdin=stdin, stdout=stdout, stderr=stderr)
def start_openvpn(config, logname, mgmt, more):'Starting OpenVPN...')
f = open(logname, 'a')
cmd = ['openvpn', '--config', config]
if mgmt:
cmd += ['--management', mgmt, 'unix',
if more:
for a in more:
a = a.split()
a[0] = '--' + a[0]
cmd += a
log.debug('start_openvpn: %r', cmd)
return sudo(*cmd, stdout=f, stderr=f)
def check_sudo_openvpn():
sp = sudo('--list', 'openvpn')
out, _ = sp.communicate()
rc = sp.wait()
if rc:
log.error('sudo(8) does not allow openvpn for you')
return None
openvpn = out.decode().splitlines()[0].strip()
if not access(openvpn, X_OK):
log.error('sudo(8) misconfigured for OpenVPN as %r', openvpn)
return None
sp = sudo('--list')
out, _ = sp.communicate()
rc = sp.wait()
if rc: # list may be disabled...
log.warning('Cannot verify sudo(8) config for %r', openvpn)
ok = False
for line in out.decode().splitlines():
# (root) NOPASSWD: /usr/sbin/openvpn
if openvpn in line and 'root' in line and 'NOPASSWD:' in line:
ok = True # quite primitive check...
if not ok:
log.error('sudo(8) does not allow passwordless openvpn for you')
return None
return openvpn
class ARGS(ArgumentParser):
def add(self, *av, **kw):
self.add_argument(*av, **kw)
return self
def parse(self):
arg = self.parse_args()
if not arg.log:
arg.log = arg.config + '.ctl.log'
if not arg.ovpnlog:
arg.ovpnlog = arg.config + '.log'
if not arg.upfile:
arg.upfile = arg.config + '.pw'
if not arg.lock:
arg.lock = arg.config + '.lock'
ff = logging.Formatter(arg.format or LOG_FORMAT)
fh = logging.FileHandler(logrotate(arg.log))
log.fatal('-' * 80)
return arg
def get_args():
return ARGS(
description='OpenVPN Controller',
).add('config', help='OpenVPN config file'
).add('--upfile', help='OpenVPN up file, other than <config>.pw' # noqa:E124,E501
).add('--lock', help='Lock file for this controller, other than <config>.lock' # noqa:E124,E501
).add('--log', help='Log file for this controller, other than <config>.ctl.log' # noqa:E124,E501
).add('--format', help='Log file format, other than ' + repr(LOG_FORMAT).replace('%', '%%') # noqa:E124,E501
).add('--ovpnlog', help='Log file for openvpn(8), other than <config>.log' # noqa:E124,E501
).add('--management', help='Force using this socket to control OpenVPN' # noqa:E124,E501
).add('--add', help='Add more OpenVPN options', action='append', default=[] # noqa:E124,E501
).add('--debug', help='Use DEBUG log level', action='store_true', default=False # noqa:E124,E501
).parse() # noqa:E124,E501
def main():
arg = get_args()
if arg.debug:
if not exists(arg.config):
log.fatal('No OpenVPN config %r exists', arg.config)
return 1
if not exists(arg.upfile):
log.fatal('No credential file %r exists', arg.upfile)
return 1
mode = stat(arg.upfile)
if mode.st_mode & 0o077:
log.fatal('Your credential file %r has mode of 0%o'
' and looks compromized (use `chmod 0%o %r` to fix)',
arg.upfile, mode.st_mode, mode.st_mode & ~0o077, arg.upfile)
return 1
socket = get_socket(arg.config, # config validation inside
openvpn = check_sudo_openvpn()
if not openvpn:
log.fatal('Your sudo(8) is not configured to run OpenVPN')
return 1'OpenVPN binary for sudo(8): %r (looks ok and configured)',
if not isabs(socket):
dname = dirname(realpath(arg.config))'Changind directory to %r', dname)
socket = realpath(socket)
if exists(socket):
except IOError as e:
log.error('Cannot remove stale socket %r: %s', socket, e)
RC = 0
with FileLock(arg.lock, timeout=0.1):'Lock aquired')
sp = start_openvpn(arg.config,
arg.add)'OpenVPN is running PID %r',
if not exists(socket):
RC = OvpnMgmt(socket, arg.upfile).run()
except KeyboardInterrupt:
RC = 0
rc = sp.wait()'OpenVPN exited with %r', rc)'Lock released')
except LockTimeout:
log.fatal('Locked out')
return 1
return RC
class Timed:
def __init__(self, logger=None):
self.logger = logger or None
def __enter__(self):
self.t1 = perf_counter()
(self.logger or log).info('start')
return self
def __exit__(self, exc_type, exc_value, traceback):
self.t2 = perf_counter()
(self.logger or log).info('stop (%s elapsed)%s',
timedelta(seconds=self.t2 - self.t1),
f' crashed {exc_type}'
if exc_value and exc_type is not SystemExit
else '')
if exc_value:
raise exc_value
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, datefmt='%F %T', format=LOG_FORMAT)
with Timed():
# vim:set ft=python ai et ts=4 sts=4 sw=4 cc=80:EOF #
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment