Skip to content

Instantly share code, notes, and snippets.

@AdSegura
Last active April 5, 2023 21:37
Show Gist options
  • Save AdSegura/da93433ee4c2059fd4a3d271ad2127d3 to your computer and use it in GitHub Desktop.
Save AdSegura/da93433ee4c2059fd4a3d271ad2127d3 to your computer and use it in GitHub Desktop.
wireguard show client list
#!/usr/bin/env python3
##
# Show wireguard statistics with usernames assigned to their public keys
# $> ./wg-show.py with no options will use
# wg0_file="/etc/wireguard/wg0.conf"
# wg_iface="wg0"
# sg_to_timeout=300 seconds
#
# ./wg-show.py --help
# usage: wg-show.py [-h] [--config CONFIG] [--interface INTERFACE]
# [--since SINCE] [--all] [--full] [--json]
# optional arguments:
# -h, --help show this help message and exit
# --config CONFIG, -c CONFIG
# wireguard config file
# --interface INTERFACE, -i INTERFACE
# wireguard interface
# --since SINCE, -s SINCE
# last seen seconds to timeout connection
# --all, -a show all clients
# --full, -f show full client profile
# --json, -j json output
##
import json
import re
import subprocess
import math
import argparse
from datetime import datetime
from operator import attrgetter
wg0_file = '/etc/wireguard/wg0.conf' # wireguard config file
wg_iface = 'wg0' # wireguard interface
sg_to_timeout = 300 # seconds needed to pass for consider client is disconnected
show_all = False # only show connected clients based on sg_to_timeout
show_full = False # only show client profile resume
json_out = False # json output
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", help="wireguard config file")
parser.add_argument("--interface", "-i", help="wireguard interface")
parser.add_argument("--since", "-s", help="last seen seconds to timeout connection")
parser.add_argument("--all", "-a", action='store_true', help="show all clients")
parser.add_argument("--full", "-f", action='store_true', help="show full client profile")
parser.add_argument("--json", "-j", action='store_true', help="json output")
args = parser.parse_args()
if args.config:
print("[INFO] Reading wireguard config %s\n" % args.config)
wg0_file = args.config
if args.interface:
print("[INFO] Wireguard interface %s\n" % args.interface)
wg_iface = args.interface
if args.all:
show_all = True
if args.since:
sg_to_timeout = int(args.since)
if args.full:
show_full = True
if args.json:
json_out = True
def since(date):
now = datetime.now()
then = datetime.fromtimestamp(int(date))
duration = now - then
return duration
def read_wg0(file):
data = {}
names = []
with open(file) as cnf:
for line in cnf:
search_name = re.search('# Name = (.+?)\n', line)
if search_name:
names.append(search_name.group(1).strip('# \n'))
search_key = re.search('PublicKey = (.+?)\n', line)
if search_key:
data[names.pop(0)] = search_key.group(1).strip(' ')
return data
def status():
# read wg config return {"client1": pub_key}
clients = read_wg0(wg0_file)
status_list = []
stats = subprocess.run(['sudo', 'wg', 'show', wg_iface, 'dump'], stdout=subprocess.PIPE) \
.stdout.decode('utf-8')
stats = stats.split('\n')
stats.pop(0)
stats.pop(len(stats) - 1)
for line in stats:
client_ = Client(line)
for client in clients:
if client_.pub_key == clients[client]:
client_.name = client
status_list.append(client_)
# order client list based on last_handshake
status_list.sort(key=attrgetter('latest_handshake'), reverse=False)
return status_list
class Client:
def __init__(self, data):
log = data.split('\t')
self.name = None
self.pub_key = log[0]
self.shared_key = log[1]
self.endpoint = log[2]
self.allowed_ips = log[3]
self.latest_handshake = int(log[4])
self.rx = int(log[5])
self.tx = int(log[6])
self.keepalive = int(log[7])
self.last = str(self.last_seen())
self.is_online = self.online()
def last_seen(self):
if self.latest_handshake == 0:
return "never"
return since(self.latest_handshake)
def online(self):
last_seen = since(self.latest_handshake)
if last_seen.total_seconds() > sg_to_timeout:
return False
return True
def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__)
def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
def show_output(client):
if show_full:
print('\033[94m\033[4m[ ' + client.name.upper() + ' ]\033[0m')
print(' last_seen: \033[95m' + str(client.last_seen()).split(".")[0] + '\033[0m')
print(' upload: \033[91m' + convert_size(client.rx) + '\033[0m')
print(' download: \033[91m' + convert_size(client.tx) + '\033[0m')
print(' public-key: \033[90m' + client.pub_key + '\033[0m')
print(' shared-key: \033[90m' + client.shared_key + '\033[0m')
print(' endpoint: \033[93m' + client.endpoint + '\033[0m')
print(' allowed-ips: \033[93m' + client.allowed_ips + '\033[0m')
print(' keepalive: \033[93m' + str(client.keepalive) + '\033[0m')
else:
print('\033[94m\033[4m[ ' + client.name.upper() + ' ]\033[0m')
print(' last_seen: \033[95m' + str(client.last_seen()).split(".")[0] + '\033[0m')
print(' upload: \033[91m' + convert_size(client.rx) + '\033[0m')
print(' download: \033[91m' + convert_size(client.tx) + '\033[0m\n')
if __name__ == '__main__':
# execute wg show dump parse results
clients_status = status()
if json_out:
c = []
for cli in reversed(clients_status):
c.append(cli.to_json())
print(json.dumps(c))
else:
for cli in reversed(clients_status):
if show_all:
show_output(cli)
elif cli.online():
show_output(cli)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment