Skip to content

Instantly share code, notes, and snippets.

@duketwo
Created November 21, 2022 08:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save duketwo/f445de3a7d23b93e06efdbde3b91f437 to your computer and use it in GitHub Desktop.
Save duketwo/f445de3a7d23b93e06efdbde3b91f437 to your computer and use it in GitHub Desktop.
OpenWRT 4G LTE WAN connection failover handling python Wireguard VPN
import ifaddr
import ipaddress
import logging
import threading
import time
import platform
import subprocess
# 1x Wireguard service running on 100.100.100.100
# One DSL-Line and one LTE/4G-Line and want automatically swap between both while keeping current connections ACTIVE
# Set a static route to 100.100.100.100
# Set the default route via the wireguard device
# Set a static route to 1.1.1.1 via the DSL connection
# Set a static route to 1.1.1.2 via the LTE connection
# Ping both 1.1.1.1/1.1.1.2 constantly to measure connection uptimes
# Change the device of route to 100.100.100.100 depending on which connection is available
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("debug.log"),
logging.StreamHandler()
]
)
route_1_down = 0
route_2_down = 0
gateway = None
ip = None
networkPrefix = None
network = None
prev_state_log = None
ticks = 0
def get_ip(name):
adapters = ifaddr.get_adapters()
for adapter in adapters:
if adapter.nice_name != name:
continue
for ip in adapter.ips:
try:
addr = ipaddress.IPv4Address(ip.ip)
return addr
except:
continue
return None
def has_interface_an_ip_address(name):
return get_ip(name) is not None
def get_network_prefix(name):
adapters = ifaddr.get_adapters()
for adapter in adapters:
if adapter.nice_name != name:
continue
for ip in adapter.ips:
try:
addr = ipaddress.IPv4Address(ip.ip)
return ip.network_prefix
except:
continue
return None
def ping(host):
command = ['fping', '-c', '1', '-t', '1200', host]
return subprocess.call(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) == 0
def thread_function(ip, id):
logging.info(f'Thread Id [{id}] Ip [{ip}]: starting')
i = 0
past_three_pings = [None] * 3
while True:
global route_1_down
global route_2_down
p = ping(ip)
past_three_pings[i] = p
if id == 1:
time.sleep(1)
else:
time.sleep(10)
i = (i + 1) % 3
if past_three_pings[0] == False and past_three_pings[1] == False and past_three_pings[2] == False:
#logging.info(f'Host [{ip}] has failed three pings in a row.')
if id == 1:
route_1_down = 1
if id == 2:
route_2_down = 1
else:
if id == 1:
route_1_down = 0
if id == 2:
route_2_down = 0
logging.info("Thread %s: finishing", ip)
def ensure_routes():
output = subprocess.getoutput('ip route')
a = f'1.1.1.2 via {gateway} dev wwan0'
if a not in output:
logging.info(f'Adding route to 1.1.1.2 via {gateway} dev wwan0')
subprocess.getoutput(f'ip route del {a}')
subprocess.getoutput(f'ip route add {a}')
b = f'1.1.1.1 via 192.168.10.1 dev br-WAN'
if b not in output:
logging.info(f'Adding route to 1.1.1.1 via 192.168.10.1 dev wwan0')
subprocess.getoutput(f'ip route del {b}')
subprocess.getoutput(f'ip route add {b}')
return
def enable_route_wwan():
subprocess.getoutput(f'ip route del 100.100.100.100 && ip route add 100.100.100.100 via {gateway} dev wwan0')
return
def enable_route_dsl():
subprocess.getoutput('ip route del 100.100.100.100 && ip route add 100.100.100.100 via 192.168.10.1 dev br-WAN')
return
def is_route_dsl_active():
output = subprocess.getoutput('ip route')
return f'100.100.100.100 via 192.168.10.1' in output
logging.info("Adding 1.1.1.1 thread.")
h1 = threading.Thread(target=thread_function, args=('1.1.1.1',1))
h1.start()
logging.info("Adding 1.1.1.2 thread.")
h2 = threading.Thread(target=thread_function, args=('1.1.1.2',2))
h2.start()
def reset_mobile_device():
logging.info(f'Resetting the mobile device.')
subprocess.getoutput('ifdown 4G && sleep 1 && echo -ne "AT^NDISDUP=1,0\r\n" > /dev/ttyUSB1 && sleep 1 && echo -ne "AT^NDISDUP=1,1,\"internet\"\r\n" > /dev/ttyUSB1 && sleep 1 && ifup 4G') #change device '4G' name + apn 'internet'
return
while True:
ip = get_ip("wwan0")
if ip is not None:
networkPrefix = get_network_prefix("wwan0")
network = ipaddress.IPv4Network(f'{ip}/{networkPrefix}', strict=False)
gateway = network[network.num_addresses-2]
ensure_routes()
if not route_1_down or not route_2_down:
if route_1_down and is_route_dsl_active() and not route_2_down:
logging.info(f'Enabling mobile route.')
enable_route_wwan()
if not route_1_down and not is_route_dsl_active():
logging.info(f'Enabling DSL route.')
enable_route_dsl()
current_state_log = f'Route 1.1.1.1 down [{route_1_down}] Route 1.1.1.2 down [{route_2_down}] Is DSL route active [{is_route_dsl_active()}]'
if current_state_log != prev_state_log:
logging.info(current_state_log)
prev_state_log = current_state_log
#print(ping('1.1.1.2'))
else:
if not is_route_dsl_active:
logging.info(f'Device wwan0 has no ip address, but the default DSL route is not active. Activating the default DSL route.')
enable_route_dsl()
if route_2_down and ticks % 60 == 0:
reset_mobile_device()
time.sleep(1)
ticks = ticks + 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment