Skip to content

Instantly share code, notes, and snippets.

@huwcbjones
Last active April 23, 2022 13:06
Show Gist options
  • Save huwcbjones/7505eb0b48eba7e829737ae2376fb095 to your computer and use it in GitHub Desktop.
Save huwcbjones/7505eb0b48eba7e829737ae2376fb095 to your computer and use it in GitHub Desktop.
Monitors internet/vpn connection and executes a command when the VPN connection is dropped/internet connection resumed
#!/usr/bin/env python3
import requests
import re
import os
import subprocess
from typing import Optional
# START VARIABLES
dyn_dns_domain = "CHANGE ME" # domain that routes to the remote ip (dyn dns)
restart_service_cmd = "RESTART VPN_SERVICE && ..." # (shell command to restart VPN and any other applications)
vpn_status_cmd = ["systemctl", "is-active", "--quiet", "CHANGEME"] # (shell command to get VPN service status via return code, 0 for running)
network_interface = "ens192" # internet facing network interface
# END VARIABLES
##
## Code below *shouldn't* need editing
##
def get_remote_ip() -> str:
response = requests.get("http://ipinfo.io/ip", timeout=1)
if response.status_code != 200:
raise Exception("Request failed")
return response.text.strip()
def has_network() -> bool:
return ping_host("10.1.2.1")
def can_reach_internet(interface=None) -> bool:
return ping_host("1.1.1.1", interface) and ping_host("8.8.8.8", interface)
def ping_host(host: str, interface=None) -> bool:
args = ["-c 1", host]
if interface is not None:
args = ["-I", interface] + args
try:
subprocess.check_call(
["ping"] + args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True
except subprocess.CalledProcessError:
return False
def get_ip_for_domain(domain: str) -> Optional[str]:
try:
dns_line = subprocess.check_output(
f"dig +short +trace {domain} A | egrep '^A'", text=True, shell=True
).strip()
except subprocess.CalledProcessError:
return None
if matches := re.findall(r"(?P<ip>[0-9]+(?:\.[0-9]+){3})", dns_line):
return matches[0]
return None
def vpn_is_running() -> bool:
try:
subprocess.check_call(vpn_status_cmd)
return True
except subprocess.CalledProcessError as err:
return False
def is_vpned() -> bool:
current_ip = get_remote_ip()
try:
reverse_dns_ip = get_ip_for_domain(dyn_dns_domain)
return current_ip != reverse_dns_ip
except Exception:
return False
if __name__ == "__main__":
network_state = has_network()
print("Network Status: {}".format("Connected" if network_state else "Disconnected"))
internet_state = can_reach_internet(network_interface)
print("Internet Status: {}".format("Connected" if internet_state else "Disconnected"))
if not network_state or not internet_state:
# No network/internet connection, so quit
exit()
vpn_status = vpn_is_running()
print("VPN Service: {}".format("Running" if vpn_status else "Stopped"))
vpn_state = can_reach_internet()
print("Internet via VPN: {}".format("Connected" if vpn_state else "Disconnected"))
vpn_routing_state = is_vpned() if vpn_state else False
print("Internet Routing: {}".format("VPN" if vpn_routing_state else "ISP"))
if not vpn_status or not vpn_state or not vpn_routing_state:
# VPN isn't running, or internet isn't accesible via VPN, so restart
print("Restarting services...")
subprocess.check_call(restart_service_cmd, shell=True)
print("Services restarted!")
vpn_status = vpn_is_running()
print("VPN Service: {}".format("Running" if vpn_status else "Stopped"))
vpn_state = can_reach_internet()
print("Internet via VPN: {}".format("Connected" if vpn_state else "Disconnected"))
vpn_routing_state = is_vpned()
print("Internet Routing: {}".format("VPN" if vpn_routing_state else "ISP"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment