Skip to content

Instantly share code, notes, and snippets.

@mmerickel
Last active October 14, 2021 04:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmerickel/2e5d6c1532619d371a4c0fa6d490816b to your computer and use it in GitHub Desktop.
Save mmerickel/2e5d6c1532619d371a4c0fa6d490816b to your computer and use it in GitHub Desktop.
import argparse
import boto3
from collections import defaultdict
import csv
from datetime import datetime, timedelta
import logging
from pprint import pprint
import queue
import signal
import sys
import time
import threading
log = logging.getLogger('openvpn-monitor')
def batch(items, chunk_size):
for i in range(0, len(items), chunk_size):
yield items[i: i + chunk_size]
def throttle(delay):
while True:
yield True
time.sleep(delay)
def parse_status_file(fp):
title = None
time = None
clients = []
routes = []
headers = {}
for row in csv.reader(fp, delimiter='\t'):
type = row[0]
if type == 'END':
break
elif type == 'TITLE':
title = row[1]
elif type == 'TIME':
time = datetime.fromtimestamp(int(row[2]))
elif type == 'HEADER':
headers[row[1]] = row[2:]
log.debug('parsed table=%s header=%s', row[1], headers[row[1]])
elif type == 'CLIENT_LIST':
client = dict(zip(headers['CLIENT_LIST'], row[1:]))
client['Connected Since'] = datetime.fromtimestamp(
int(client['Connected Since (time_t)']),
)
client['Bytes Sent'] = int(client['Bytes Sent'])
client['Bytes Received'] = int(client['Bytes Received'])
clients.append(client)
elif type == 'ROUTING_TABLE':
route = dict(zip(headers['ROUTING_TABLE'], row[1:]))
route['Last Ref'] = datetime.fromtimestamp(int(route['Last Ref (time_t)']))
routes.append(route)
return {
'title': title,
'time': time,
'clients': clients,
'connecting_clients': [
c
for c in clients
if c['Virtual Address'] == ''
],
'connected_clients': [
c
for c in clients
if (
c['Virtual Address']
# UNDEF means the client has failed to authenticate
and c['Common Name'] != 'UNDEF'
and c['Username'] != 'UNDEF'
)
],
'routes': routes,
}
def initialize_accumulator(status, last_acc=None):
acc = {
'initial_time': status['time'],
'last_time': status['time'],
'last_status': status,
'connected_client_ids': {c['Client ID'] for c in status['connected_clients']},
'known_client_ids': {c['Client ID'] for c in status['clients']},
'new_client_ids': set(),
'new_connected_client_ids': set(),
'failed_authentication_attempts': 0,
'user': defaultdict(lambda: {
'connected_client_ids': set(),
'net_bytes_sent': 0,
'net_bytes_recv': 0,
}),
}
accumulate_stats(status, acc)
return acc
def accumulate_stats(status, acc):
last_status = acc['last_status']
acc['last_time'] = status['time']
acc['last_status'] = status
connected_client_ids = {c['Client ID'] for c in status['connected_clients']}
connecting_client_ids = {c['Client ID'] for c in status['connecting_clients']}
# if a previously connecting client fell out of connecting, determine if it
# was successful or a failed authentication attempt
for c in last_status['connecting_clients']:
cid = c['Client ID']
if cid not in connected_client_ids and cid not in connecting_client_ids:
log.debug(
'detected authentication failure from client=%s, username=%s',
cid, c['Username'],
)
acc['failed_authentication_attempts'] += 1
acc['connected_client_ids'] = connected_client_ids
acc['new_client_ids'].update(
c['Client ID']
for c in status['clients']
if c['Client ID'] not in acc['known_client_ids']
)
acc['known_client_ids'].update(acc['new_client_ids'])
# if the new client id ends up connected, count it as a new connection
for cid in acc['new_client_ids']:
if cid in acc['connected_client_ids']:
log.debug('detected new connected client id=%s', cid)
acc['new_connected_client_ids'].add(cid)
for client in status['connected_clients']:
client_bytes_sent = client['Bytes Sent']
client_bytes_recv = client['Bytes Received']
last_client = next(
(c for c in last_status['clients'] if c['Client ID'] == client['Client ID']),
None,
)
if last_client is not None:
client_bytes_sent -= last_client['Bytes Sent']
client_bytes_recv -= last_client['Bytes Received']
username = client['Username']
stats = acc['user'][username]
stats['connected_client_ids'].add(client['Client ID'])
stats['net_bytes_sent'] += client_bytes_sent
stats['net_bytes_recv'] += client_bytes_recv
def generate_metrics(acc, time, extra_dimensions):
metrics = []
metrics.append({
'MetricName': 'total_connected_clients',
'Values': [len(acc['connected_client_ids'])],
'Timestamp': time,
'Unit': 'Count',
'Dimensions': extra_dimensions,
})
metrics.append({
'MetricName': 'new_connected_clients',
'Values': [len(acc['new_connected_client_ids'])],
'Timestamp': time,
'Unit': 'Count',
'Dimensions': extra_dimensions,
})
metrics.append({
'MetricName': 'new_connection_attempts',
'Values': [len(acc['new_client_ids'])],
'Timestamp': time,
'Unit': 'Count',
'Dimensions': extra_dimensions,
})
metrics.append({
'MetricName': 'failed_authentication_attempts',
'Values': [acc['failed_authentication_attempts']],
'Timestamp': time,
'Unit': 'Count',
'Dimensions': extra_dimensions,
})
def record_user_metric(user, name, unit, value):
metrics.append({
'MetricName': name,
'Dimensions': [
{
'Name': 'Username',
'Value': user,
},
*extra_dimensions,
],
'Timestamp': time,
'Values': [value],
'Unit': unit,
})
for user, stats in acc['user'].items():
record_user_metric(
user, 'connected_clients', 'Count', len(stats['connected_client_ids']))
record_user_metric(user, 'net_bytes_recv', 'Bytes', stats['net_bytes_recv'])
record_user_metric(user, 'net_bytes_sent', 'Bytes', stats['net_bytes_sent'])
return metrics
def upload_main(*, q, namespace, extra_dimensions, exc_info, dry_run=False):
try:
cw = boto3.client('cloudwatch')
while (msg := q.get()) is not None:
now, acc = msg
metrics = generate_metrics(acc, now, extra_dimensions)
if dry_run:
pprint(metrics)
continue
for chunk in batch(metrics, 20):
try:
cw.put_metric_data(
Namespace=namespace,
MetricData=chunk,
)
except Exception:
log.exception('failed to upload metric data')
continue
except BaseException:
log.exception('unhandled error occurred uploading metrics')
exc_info.extend(sys.exc_info())
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--status-file', required=True)
parser.add_argument('--poll-interval', type=int, default=5)
parser.add_argument('--cw-interval', type=int, default=60)
parser.add_argument('--cw-namespace', default='OpenVPN')
parser.add_argument('--dim', dest='dimensions', action='append', default=[])
parser.add_argument('--dry-run', action='store_true')
parser.add_argument('-v', '--verbose', action='count', default=0)
args = parser.parse_args()
root_log_level = logging.WARNING
local_log_level = logging.INFO
if args.verbose >= 1:
local_log_level = logging.DEBUG
if args.verbose >= 2:
root_log_level = logging.INFO
if args.verbose >= 3:
root_log_level = logging.DEBUG
log.setLevel(local_log_level)
logging.basicConfig(level=root_log_level)
extra_dimensions = []
for entry in args.dimensions:
name, value = entry.split('=', 1)
extra_dimensions.append({
'Name': name,
'Value': value,
})
q = queue.Queue()
upload_exc_info = []
upload_thread = threading.Thread(
target=upload_main,
kwargs=dict(
q=q,
dry_run=args.dry_run,
namespace=args.cw_namespace,
extra_dimensions=extra_dimensions,
exc_info=upload_exc_info,
),
)
upload_thread.daemon = True
upload_thread.start()
stop = False
report_interval = timedelta(seconds=args.cw_interval)
last_report = datetime.utcnow()
acc = None
def handle_stop(*a, **kw):
nonlocal stop
stop = True
signal.signal(signal.SIGTERM, handle_stop)
signal.signal(signal.SIGINT, handle_stop)
for _ in throttle(args.poll_interval):
if stop or not upload_thread.is_alive():
break
with open(args.status_file) as fp:
try:
status = parse_status_file(fp)
except Exception:
log.exception('failed to parse openvpn status')
continue
if acc is None:
acc = initialize_accumulator(status, None)
continue
if acc['last_time'] != status['time']:
accumulate_stats(status, acc)
log.debug("accumulator=%s", acc)
now = datetime.utcnow()
if now - last_report > report_interval or stop:
q.put((now, acc))
acc = initialize_accumulator(status, acc)
last_report = now
log.info('shutting down...')
q.put(None)
upload_thread.join()
if upload_exc_info:
return 1
return 0
if __name__ == '__main__':
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment