Created
August 24, 2020 16:00
-
-
Save rwhelan/f46d1f6f07df71f1bd1786eda447b97f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from subprocess import Popen, PIPE | |
wg_bin = 'wg' | |
class WireGuardCommandError(Exception): pass | |
def _execute_wireguard_command(cmd, stdin=''): | |
proc = Popen([wg_bin] + cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE) | |
if stdin != '': | |
stdout, stderr = proc.communicate(stdin) | |
else: | |
stdout, stderr = proc.communicate() | |
if proc.returncode: raise WireGuardCommandError(stderr) | |
return (proc.returncode, stdout, stderr) | |
def read_config(): | |
config = {} | |
wg_dump, stderr = Popen(['wg', 'show', 'all', 'dump'], stdout = PIPE, stderr = PIPE).communicate() | |
for row in wg_dump.strip(b'\n').split(b'\n'): | |
parsed = [i.decode() for i in row.split(b'\t')] | |
if parsed == ['']: break | |
if parsed[0] not in config: | |
config[parsed[0]] = {} | |
config[parsed[0]]['peers'] = [] | |
config[parsed[0]]['private-key'] = parsed[1] | |
config[parsed[0]]['public-key'] = parsed[2] | |
config[parsed[0]]['listen-port'] = int(parsed[3]) | |
config[parsed[0]]['fwmark'] = 0 if parsed[4] == 'off' else int(parsed[4]) | |
else: | |
_peer = {} | |
_peer['public-key'] = parsed[1] | |
_peer['preshared-key'] = None if parsed[2] == '(none)' else parsed[2] | |
_peer['endpoint'] = None if parsed[3] == '(none)' else parsed[3] | |
_peer['allowed-ips'] = [] if parsed[4] == '(none)' else parsed[4].split(',') | |
_peer['latest-handshake'] = parsed[5] | |
_peer['transfer-rx'] = int(parsed[6]) | |
_peer['transfer-tx'] = int(parsed[7]) | |
_peer['persistent-keepalive'] = 0 if parsed[8] == 'off' else int(parsed[8]) | |
config[parsed[0]]['peers'].append(_peer) | |
return config | |
def set_iface(iface, config): | |
if 'private-key' in config: | |
_private_key = config['private-key'].encode() | |
config['private-key'] = '/proc/self/fd/0' | |
cmd_ops = [] | |
for k in config: | |
cmd_ops.append(k) | |
cmd_ops.append(config[k]) | |
cmd = ['set', iface] + cmd_ops | |
if 'private-key' in config: | |
return _execute_wireguard_command(cmd, _private_key)[0] | |
else: | |
return _execute_wireguard_command(cmd)[0] | |
def set_peer(iface, peer, config): | |
if 'allowed-ips' in config: | |
config['allowed-ips'] = ",".join(config['allowed-ips']) | |
cmd_ops = [] | |
for k in config: | |
cmd_ops.append(k) | |
cmd_ops.append(config[k]) | |
cmd = ['set', iface, 'peer', peer] + cmd_ops | |
return _execute_wireguard_command(cmd)[0] | |
def delete_peer(iface, peer): | |
return _execute_wireguard_command(['set', iface, 'peer', peer, 'remove'])[0] | |
def generate_private_key(): | |
return _execute_wireguard_command(['genkey'])[1].strip().decode() | |
def generate_public_key(private_key): | |
return _execute_wireguard_command(['pubkey'], stdin=private_key.encode())[1].strip().decode() | |
try: | |
private_key = open('/tmp/.wgKey', 'r').read().strip() | |
except FileNotFoundError: | |
private_key = generate_private_key() | |
open('/tmp/.wgKey', 'w').write(private_key) | |
public_key = generate_public_key(private_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment