Skip to content

Instantly share code, notes, and snippets.

@danielrivard
Last active May 17, 2023 00:59
Show Gist options
  • Save danielrivard/9107e8b9665e63a1a90f5b74db8a9b96 to your computer and use it in GitHub Desktop.
Save danielrivard/9107e8b9665e63a1a90f5b74db8a9b96 to your computer and use it in GitHub Desktop.
Designed to work with gnome extension https://gitlab.com/XavierBerger/custom-vpn-toggler
#!/usr/bin/python3
import fcntl
import socket
import struct
import sys
import time
import webbrowser
from typing import List
import dbus
from openvpn3 import (Configuration, ConfigurationManager, Session,
SessionManager)
from openvpn3.constants import StatusMajor, StatusMinor
# Imported with
# openvpn3 config-import --config openvpn-config.ovpn --persistent --name NAME_OF_THE_IMPORTED_CONNECTION
vpn_connection_name = 'NAME_OF_THE_IMPORTED_CONNECTION'
class OpenVpn3Manager:
def __init__(self, config_name) -> None:
self.config_name = config_name
bus = dbus.SystemBus()
self.config_manager = ConfigurationManager(bus)
self.session_manager = SessionManager(bus)
def _get_config(self) -> Configuration:
cfg = self.config_manager.LookupConfigName(self.config_name)
if len(cfg) == 1:
return self.config_manager.Retrieve(cfg[0])
elif len(cfg) > 1:
print(f'More than one configuration found for {self.config_name}, make sure to have only one per name')
sys.exit(1)
else:
print(f'No configuration found for {self.config_name}, make sure to configure openvpn3 first')
sys.exit(1)
def get_active_sessions(self) -> List[Session]:
active_sessions: List[Session] = []
active_paths = self.session_manager.LookupConfigName(self.config_name)
if len(active_paths) > 0:
for path in active_paths:
active_sessions.append(self.session_manager.Retrieve(path))
return active_sessions
def clean_non_connected_sessions(self, active_sessions: List[Session]) -> bool:
found_connected = False
for session in active_sessions:
status = session.GetStatus()
if status['major'] == StatusMajor.CONNECTION and status['minor'] == StatusMinor.CONN_CONNECTED:
found_connected = True
else:
# Incomplete session found, just disconnect it
session.Disconnect()
return found_connected
def get_new_session(self) -> Session:
configuration = self._get_config()
return self.session_manager.NewTunnel(configuration)
def get_tunnel_ip(self, session: Session) -> str:
# Get the tunnel interface name from DBUS property
tunnel_interface = session.GetProperty('device_name')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
packed_iface = struct.pack('256s', tunnel_interface.encode('utf_8'))
packed_addr = fcntl.ioctl(sock.fileno(), 0x8915, packed_iface)[20:24]
return socket.inet_ntoa(packed_addr)
if len(sys.argv) != 2 or sys.argv[1] not in ['start', 'stop', 'ip']:
print("** Usage: %s start|stop|ip" % sys.argv[0])
sys.exit(1)
vpn_manager = OpenVpn3Manager(vpn_connection_name)
active_sessions: List[Session] = vpn_manager.get_active_sessions()
command = sys.argv[1]
if command == 'start':
if vpn_manager.clean_non_connected_sessions(active_sessions):
print(f'There is already an active {vpn_manager.config_name} session')
sys.exit(0)
session = vpn_manager.get_new_session()
time.sleep(0.5)
session.Ready()
session.Connect()
connected = False
while not connected:
time.sleep(0.5)
status = session.GetStatus()
if StatusMajor.SESSION == status['major'] and StatusMinor.SESS_AUTH_URL == status['minor']:
webbrowser.open_new_tab(status['message'])
connected = True
if StatusMajor.CONNECTION == status['major'] and StatusMinor.CONN_CONNECTED == status['minor']:
connected = True
elif command == 'stop':
for session in active_sessions:
session.Disconnect()
elif command == 'ip':
for session in active_sessions:
status = session.GetStatus()
if status['major'] == StatusMajor.CONNECTION and status['minor'] == StatusMinor.CONN_CONNECTED:
print(vpn_manager.get_tunnel_ip(session))
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment