Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Check ProtonVPN peer
#!/usr/bin/env python3
import requests
import json
import sys
import os
import socket
# --- settings ---
verbose=True # talk too, or just action
tier=2 # set this to the tier of servers you want to restrict the search to
test=5 # the x lowest-load servers will be tested for average round-trip time
threshold=50 # only reconsider the connection if current-peer load is above this number
country = str(sys.argv[1]) # enter the country abbreviation ProtonVPN uses, set here to first argument
state = str(sys.argv[2]) # enter the state abbreviation ProtonVPN uses, set here to (mandatory) second argument (can be set to "")
peer_name='ProtonVPN-'+country.upper() # enter the name of your peer as entered in RouterOS, set here to be 'ProtonVPN-XX' where XX is the country code
ssh_host='192.168.1.1' # the IP address of your Mikrotik
ssh_port=22 # the SSH port of your Mikrotik
ssh_identity='-i ~/.ssh/id_rsa' # use this if you need to reference a specific ssh key
ssh_user='rob' # the username with which to access your Mikrotik
# ----------------
def runOnRouter(command):
ssh_command = 'ssh -l '+ssh_user+' '+ssh_identity+' -p '+str(ssh_port)+' '+ssh_host+' "'+command+'"'
os.system(ssh_command)
def getFromRouter(command,filter):
ssh_command = 'ssh -l '+ssh_user+' '+ssh_identity+' -p '+str(ssh_port)+' '+ssh_host+' "'+command+'" | grep '+filter
stream = os.popen(ssh_command)
return stream.read()
router_raw=getFromRouter('/ip ipsec peer print where name='+peer_name,'.protonvpn.com')
current_peer=router_raw[router_raw.find('address=')+len('address='):router_raw.rfind(' profile')]
if verbose: print('Downloading server metrics')
response = requests.get('https://api.protonvpn.ch/vpn/logicals').text
json = json.loads(response)
hostnames_matched = []
loads_matched = []
hostnames_valid = []
loads_valid = []
hostnames_best = []
loads_best = []
rtts_best = []
if verbose: print('-',len(json['LogicalServers']),'total servers')
for server in json['LogicalServers']:
if server['ExitCountry'] == country and server['Domain'].startswith(country.lower()+'-'+state.lower()) and server['Tier'] == tier:
hostnames_matched.append(server['Domain'])
loads_matched.append(server['Load'])
if verbose: print('-',len(hostnames_matched),'match criteria')
if verbose: print('Curent',country.upper(),'peer:')
if verbose: print('- Set to',current_peer)
if current_peer in hostnames_matched:
if verbose: print('- Found in list')
for x in range(len(hostnames_matched)):
if hostnames_matched[x] == current_peer:
if loads_matched[x] > threshold:
if verbose: print('- Current load at',str(loads_matched[x])+'%','is above review threshold',str(threshold)+'%')
optimize=True
if loads_matched[x] < threshold:
if verbose: print('- Load at',str(loads_matched[x])+'%','is below review threshold',str(threshold)+'%')
optimize=False
else:
if verbose: print('- Not found in list')
optimize=True
if optimize:
if verbose: print('Finding better server')
for y in range(len(hostnames_matched)):
try:
ip = socket.gethostbyname(hostnames_matched[y])
hostnames_valid.append(hostnames_matched[y])
loads_valid.append(loads_matched[y])
except:
pass
if test > len(hostnames_valid):
test = len(hostnames_valid)
if verbose: print('-',len(hostnames_valid),'are valid hostnames')
if verbose: print('-',test,'lowest load servers being tested')
for x in range(test):
mindex = loads_valid.index(min(loads_valid))
try:
rtt = os.popen("ping -c 10 "+hostnames_valid[mindex]+" | tail -n 1 | awk -F'/' '{ print $5 }'").read().rstrip()
hostnames_best.append(hostnames_valid[mindex])
loads_best.append(loads_valid[mindex])
rtts_best.append(rtt)
if verbose: print(' '+str(x+1)+': '+hostnames_valid[mindex]+', load='+str(loads_valid[mindex])+'%, rtt='+str(rtt))
except:
pass
del hostnames_valid[mindex]
del loads_valid[mindex]
best_server = hostnames_best[rtts_best.index(min(rtts_best))]
if best_server == current_peer:
if verbose: print('Current peer has the best performance')
else:
if verbose: print('Found a better server')
if verbose: print('- Disabling',current_peer)
runOnRouter('/ip ipsec peer disable '+peer_name)
if verbose: print('- Changing',country,'peer to',best_server)
runOnRouter('/ip ipsec peer set address='+best_server+' '+peer_name)
if verbose: print('- Enabling',best_server)
runOnRouter('/ip ipsec peer enable '+peer_name)
else:
if verbose: print('No need to find a better server')
if verbose: print('Done')
@dasnolojy
Copy link

dasnolojy commented May 29, 2022

Here is the Windows version.

`#!/usr/bin/env python3

import requests
import json
import sys
import os
import socket

# --- settings ---

verbose=True                                # talk too, or just action
tier=1                                      # set this to the tier of servers you want to restrict the search to
test=5                                      # the x lowest-load servers will be tested for average round-trip time
threshold=40                                # only reconsider the connection if current-peer load is above this number
country = str(sys.argv[1])                  # enter the country abbreviation ProtonVPN uses, set here to first argument 
state = str(sys.argv[2])                    # enter the state abbreviation ProtonVPN uses, set here to (mandatory) second argument (can be set to "")
peer_name='ProtonVPN-'+country.upper()      # enter the name of your peer as entered in RouterOS, set here to be 'ProtonVPN-XX' where XX is the country code
ssh_host='192.168.88.1'                      # the IP address of your Mikrotik
ssh_port=22                                 # the SSH port of your Mikrotik
ssh_identity='-i c:\\users\\user\\.ssh\\id_rsa'             # use this if you need to reference a specific ssh key 
ssh_user='admin'                              # the username with which to access your Mikrotik

# ----------------

def runOnRouter(command):
    ssh_command = 'ssh -l '+ssh_user+' '+ssh_identity+' -p '+str(ssh_port)+' '+ssh_host+' "'+command+'"'
    os.system(ssh_command)

def getFromRouter(command,filter):
    ssh_command = 'ssh -l '+ssh_user+' '+ssh_identity+' -p '+str(ssh_port)+' '+ssh_host+' "'+command+'" | findstr '+filter
    stream = os.popen(ssh_command)
    return stream.read()

router_raw=getFromRouter('/ip ipsec peer print where name='+peer_name,'.protonvpn.net')
current_peer=router_raw[router_raw.find('address=')+len('address='):router_raw.rfind(' profile=')].rstrip()

if verbose: print('Downloading server metrics')

response = requests.get('https://api.protonvpn.ch/vpn/logicals').text
json = json.loads(response)

hostnames_matched = [] 
loads_matched = [] 

hostnames_valid = []
loads_valid = [] 

hostnames_best = [] 
loads_best = [] 
rtts_best = [] 

if verbose: print('-',len(json['LogicalServers']),'total servers')

for server in json['LogicalServers']:

    if server['ExitCountry'] == country and server['Domain'].startswith(country.lower()+'-'+state.lower()) and server['Tier'] == tier:
        hostnames_matched.append(server['Domain'])
        loads_matched.append(server['Load'])

if verbose: print('-',len(hostnames_matched),'match criteria')

if verbose: print('Curent',country.upper(),'peer:')
if verbose: print('- Set to',current_peer)

if current_peer in hostnames_matched: 

    if verbose: print('- Found in list')

    for x in range(len(hostnames_matched)): 

        if hostnames_matched[x] == current_peer: 
            if loads_matched[x] > threshold: 
                if verbose: print('- Current load at',str(loads_matched[x])+'%','is above review threshold',str(threshold)+'%')
                optimize=True
            if loads_matched[x] < threshold: 
                if verbose: print('- Load at',str(loads_matched[x])+'%','is below review threshold',str(threshold)+'%')
                optimize=False

else: 
    if verbose: print('- Not found in list')
    optimize=True

if optimize: 
    if verbose: print('Finding better server')

    for y in range(len(hostnames_matched)): 

        try: 
            ip = socket.gethostbyname(hostnames_matched[y])
            hostnames_valid.append(hostnames_matched[y])
            loads_valid.append(loads_matched[y])
        except: 
            pass

    if test > len(hostnames_valid): 
        test = len(hostnames_valid)

    if verbose: print('-',len(hostnames_valid),'are valid hostnames')
    if verbose: print('-',test,'lowest load servers being tested')

    for x in range(test):

        mindex = loads_valid.index(min(loads_valid))

        try: 
            rtt_raw = os.popen("ping -n 5 "+hostnames_valid[mindex]+" | findstr Minimum").read()
            rtt = rtt_raw[rtt_raw.find('Minimum = ')+len('Minimum = '):rtt_raw.rfind('ms, Maximum')]
            hostnames_best.append(hostnames_valid[mindex])
            loads_best.append(loads_valid[mindex])
            rtts_best.append(rtt)
            if verbose: print('  '+str(x+1)+': '+hostnames_valid[mindex]+', load='+str(loads_valid[mindex])+'%, rtt='+str(rtt))
        except: 
            pass

        del hostnames_valid[mindex]
        del loads_valid[mindex]

    best_server = hostnames_best[rtts_best.index(min(rtts_best))]

    if best_server == current_peer: 
        if verbose: print('Current peer has the best performance')
    else: 
        if verbose: print('Found a better server')
        if verbose: print('- Disabling',current_peer)
        runOnRouter('/ip ipsec peer disable '+peer_name)
        if verbose: print('- Changing',country,'peer to',best_server)
        runOnRouter('/ip ipsec peer set address='+best_server+' '+peer_name)
        if verbose: print('- Enabling',best_server)
        runOnRouter('/ip ipsec peer enable '+peer_name)

else: 
    if verbose: print('No need to find a better server')

if verbose: print('Done')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment