Skip to content

Instantly share code, notes, and snippets.

@chrismeyersfsu
Created March 13, 2023 02:48
Show Gist options
  • Save chrismeyersfsu/1514143205356e2f2255ae83bbb8c9c6 to your computer and use it in GitHub Desktop.
Save chrismeyersfsu/1514143205356e2f2255ae83bbb8c9c6 to your computer and use it in GitHub Desktop.
Vpn-creds
#!/usr/local/bin/python3.8
import socket
import select
import fcntl
import os
from collections.abc import Generator
import sys
import pyotp
import time
READ_SIZE = 1024
USERNAME = 'vpn_username_here'
TOTP_TOKEN = 'your_token_secret_here'
SELECT_TIMEOUT = 1000
DEBUG_LEVELS = {
'info': 1,
'debug': 2,
}
DEBUG_LEVEL = DEBUG_LEVELS['info']
def printd(*args, **kwargs):
if DEBUG_LEVEL >= DEBUG_LEVELS['debug']:
print(*args, **kwargs)
def printi(*args, **kwargs):
if DEBUG_LEVEL >= DEBUG_LEVELS['info']:
print(*args, **kwargs)
"""
Connect to openvpn client management socket, read messages until it finds an
auth request; reply with username and generated totp token.
Disconnect if nothing interesting happens and reconnect
The constant disconnect/reconnect is to work around the fact that pfSense + openvpn
only supports 1 user on the management socket at a time. The pfSense system gets the
status of the connection via this socket. Our disconnect/connect looping allows for
the status to take a turn.
"""
class messages(Generator):
def __init__(self, sock, timeout):
self.sock = sock
self.timeout = timeout
self.data = b''
self.msgs = []
def read_msg(self):
inputs = [self.sock]
#readable, writable, exceptional = select.select(inputs, [], inputs, 2.0)
fd_to_sock = { self.sock.fileno(): self.sock }
poller = select.poll()
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
poller.register(self.sock, READ_ONLY)
events = poller.poll(self.timeout)
if not events:
printd("Timed out on select", file=sys.stderr)
raise RuntimeError(f"Timed out on select")
for fd, flag in events:
sock = fd_to_sock[fd]
if flag & (select.POLLIN | select.POLLPRI):
if sock is sock:
try:
res = sock.recv(READ_SIZE)
except socket.error as e:
err = e.args[0]
raise RuntimeError(f"sock recv() returned None, likely closed {err}")
if res == b'':
sock.close()
raise RuntimeError("sock recv() returned None, likely closed")
self.data += res
elif flag & (select.POLLHUP | select.POLLERR):
sock.close()
raise RuntimeError("socket failed during read")
def process_msgs(self):
while True:
try:
index = self.data.index(b'\n')
except ValueError:
break
self.msgs.append(self.data[0:index])
self.data = self.data[index+1:]
def send(self, ignore):
while True:
if self.msgs:
return self.msgs.pop().decode()
try:
self.read_msg()
except RuntimeError as e:
printd(f"Exception in send() {e}", file=sys.stderr)
self.throw(f"Socket error {e}")
self.process_msgs()
def throw(self, type=None, value=None, traceback=None):
raise StopIteration
def run(openvpn_ctrl_path='/var/etc/openvpn/client1/sock'):
while True:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(openvpn_ctrl_path)
except socket.error as msg:
prindt(f"Failed to connect to {openvpn_ctrl_path}: {msg}", file=sys.stderr)
time.sleep(3)
continue
printd("Connected, entering msg loop", file=sys.stderr)
sock.setblocking(0)
for msg in messages(sock, SELECT_TIMEOUT):
printd(f"Message: {msg}")
if msg.startswith(">PASSWORD:Need 'Auth' username/password"):
printi(f"Sending asked for credentials")
totp = pyotp.TOTP(TOTP_TOKEN)
token = totp.now()
need_type = 'Auth'
reply_suffix = '\n'
reply_user = f'username "{need_type}" {USERNAME}{reply_suffix}'
reply_pass = f'password "{need_type}" {token}{reply_suffix}'
sock.sendall(f'{reply_user}{reply_pass}'.encode())
printd("Disconnected", file=sys.stderr)
# Don't reconnect. This works well with cron running this in a loop
break
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment