Skip to content

Instantly share code, notes, and snippets.

@rwhelan
Created August 24, 2020 16:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rwhelan/f46d1f6f07df71f1bd1786eda447b97f to your computer and use it in GitHub Desktop.
Save rwhelan/f46d1f6f07df71f1bd1786eda447b97f to your computer and use it in GitHub Desktop.
import json
from subprocess import Popen, PIPE
wg_bin = 'wg'
class WireGuardCommandError(Exception): pass
def _execute_wireguard_command(cmd, stdin=''):
proc = Popen([wg_bin] + cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
if stdin != '':
stdout, stderr = proc.communicate(stdin)
else:
stdout, stderr = proc.communicate()
if proc.returncode: raise WireGuardCommandError(stderr)
return (proc.returncode, stdout, stderr)
def read_config():
config = {}
wg_dump, stderr = Popen(['wg', 'show', 'all', 'dump'], stdout = PIPE, stderr = PIPE).communicate()
for row in wg_dump.strip(b'\n').split(b'\n'):
parsed = [i.decode() for i in row.split(b'\t')]
if parsed == ['']: break
if parsed[0] not in config:
config[parsed[0]] = {}
config[parsed[0]]['peers'] = []
config[parsed[0]]['private-key'] = parsed[1]
config[parsed[0]]['public-key'] = parsed[2]
config[parsed[0]]['listen-port'] = int(parsed[3])
config[parsed[0]]['fwmark'] = 0 if parsed[4] == 'off' else int(parsed[4])
else:
_peer = {}
_peer['public-key'] = parsed[1]
_peer['preshared-key'] = None if parsed[2] == '(none)' else parsed[2]
_peer['endpoint'] = None if parsed[3] == '(none)' else parsed[3]
_peer['allowed-ips'] = [] if parsed[4] == '(none)' else parsed[4].split(',')
_peer['latest-handshake'] = parsed[5]
_peer['transfer-rx'] = int(parsed[6])
_peer['transfer-tx'] = int(parsed[7])
_peer['persistent-keepalive'] = 0 if parsed[8] == 'off' else int(parsed[8])
config[parsed[0]]['peers'].append(_peer)
return config
def set_iface(iface, config):
if 'private-key' in config:
_private_key = config['private-key'].encode()
config['private-key'] = '/proc/self/fd/0'
cmd_ops = []
for k in config:
cmd_ops.append(k)
cmd_ops.append(config[k])
cmd = ['set', iface] + cmd_ops
if 'private-key' in config:
return _execute_wireguard_command(cmd, _private_key)[0]
else:
return _execute_wireguard_command(cmd)[0]
def set_peer(iface, peer, config):
if 'allowed-ips' in config:
config['allowed-ips'] = ",".join(config['allowed-ips'])
cmd_ops = []
for k in config:
cmd_ops.append(k)
cmd_ops.append(config[k])
cmd = ['set', iface, 'peer', peer] + cmd_ops
return _execute_wireguard_command(cmd)[0]
def delete_peer(iface, peer):
return _execute_wireguard_command(['set', iface, 'peer', peer, 'remove'])[0]
def generate_private_key():
return _execute_wireguard_command(['genkey'])[1].strip().decode()
def generate_public_key(private_key):
return _execute_wireguard_command(['pubkey'], stdin=private_key.encode())[1].strip().decode()
try:
private_key = open('/tmp/.wgKey', 'r').read().strip()
except FileNotFoundError:
private_key = generate_private_key()
open('/tmp/.wgKey', 'w').write(private_key)
public_key = generate_public_key(private_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment