Last active
May 17, 2023 00:59
-
-
Save danielrivard/9107e8b9665e63a1a90f5b74db8a9b96 to your computer and use it in GitHub Desktop.
Designed to work with gnome extension https://gitlab.com/XavierBerger/custom-vpn-toggler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import fcntl | |
import socket | |
import struct | |
import sys | |
import time | |
import webbrowser | |
from typing import List | |
import dbus | |
from openvpn3 import (Configuration, ConfigurationManager, Session, | |
SessionManager) | |
from openvpn3.constants import StatusMajor, StatusMinor | |
# Imported with | |
# openvpn3 config-import --config openvpn-config.ovpn --persistent --name NAME_OF_THE_IMPORTED_CONNECTION | |
vpn_connection_name = 'NAME_OF_THE_IMPORTED_CONNECTION' | |
class OpenVpn3Manager: | |
def __init__(self, config_name) -> None: | |
self.config_name = config_name | |
bus = dbus.SystemBus() | |
self.config_manager = ConfigurationManager(bus) | |
self.session_manager = SessionManager(bus) | |
def _get_config(self) -> Configuration: | |
cfg = self.config_manager.LookupConfigName(self.config_name) | |
if len(cfg) == 1: | |
return self.config_manager.Retrieve(cfg[0]) | |
elif len(cfg) > 1: | |
print(f'More than one configuration found for {self.config_name}, make sure to have only one per name') | |
sys.exit(1) | |
else: | |
print(f'No configuration found for {self.config_name}, make sure to configure openvpn3 first') | |
sys.exit(1) | |
def get_active_sessions(self) -> List[Session]: | |
active_sessions: List[Session] = [] | |
active_paths = self.session_manager.LookupConfigName(self.config_name) | |
if len(active_paths) > 0: | |
for path in active_paths: | |
active_sessions.append(self.session_manager.Retrieve(path)) | |
return active_sessions | |
def clean_non_connected_sessions(self, active_sessions: List[Session]) -> bool: | |
found_connected = False | |
for session in active_sessions: | |
status = session.GetStatus() | |
if status['major'] == StatusMajor.CONNECTION and status['minor'] == StatusMinor.CONN_CONNECTED: | |
found_connected = True | |
else: | |
# Incomplete session found, just disconnect it | |
session.Disconnect() | |
return found_connected | |
def get_new_session(self) -> Session: | |
configuration = self._get_config() | |
return self.session_manager.NewTunnel(configuration) | |
def get_tunnel_ip(self, session: Session) -> str: | |
# Get the tunnel interface name from DBUS property | |
tunnel_interface = session.GetProperty('device_name') | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
packed_iface = struct.pack('256s', tunnel_interface.encode('utf_8')) | |
packed_addr = fcntl.ioctl(sock.fileno(), 0x8915, packed_iface)[20:24] | |
return socket.inet_ntoa(packed_addr) | |
if len(sys.argv) != 2 or sys.argv[1] not in ['start', 'stop', 'ip']: | |
print("** Usage: %s start|stop|ip" % sys.argv[0]) | |
sys.exit(1) | |
vpn_manager = OpenVpn3Manager(vpn_connection_name) | |
active_sessions: List[Session] = vpn_manager.get_active_sessions() | |
command = sys.argv[1] | |
if command == 'start': | |
if vpn_manager.clean_non_connected_sessions(active_sessions): | |
print(f'There is already an active {vpn_manager.config_name} session') | |
sys.exit(0) | |
session = vpn_manager.get_new_session() | |
time.sleep(0.5) | |
session.Ready() | |
session.Connect() | |
connected = False | |
while not connected: | |
time.sleep(0.5) | |
status = session.GetStatus() | |
if StatusMajor.SESSION == status['major'] and StatusMinor.SESS_AUTH_URL == status['minor']: | |
webbrowser.open_new_tab(status['message']) | |
connected = True | |
if StatusMajor.CONNECTION == status['major'] and StatusMinor.CONN_CONNECTED == status['minor']: | |
connected = True | |
elif command == 'stop': | |
for session in active_sessions: | |
session.Disconnect() | |
elif command == 'ip': | |
for session in active_sessions: | |
status = session.GetStatus() | |
if status['major'] == StatusMajor.CONNECTION and status['minor'] == StatusMinor.CONN_CONNECTED: | |
print(vpn_manager.get_tunnel_ip(session)) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment