Last active
June 17, 2022 18:36
-
-
Save B17C0D3/cd95f01df6c9f96ca9f722d4511c7ed7 to your computer and use it in GitHub Desktop.
desec.io dyndns updater
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# sudo apt install dnsutils | |
# pip install netifaces netaddr requests dnspython | |
# windows only pip install tzdata | |
# echo 'export PATH=$PATH:/home/alexander/.local/bin' >> ~/.bashrc | |
# */10 * * * * python /home/alexander/desec_dyndns.py >> /home/alexander/desec_dyndns.log | |
import subprocess | |
import requests | |
import time | |
from sys import argv | |
from datetime import datetime | |
from zoneinfo import ZoneInfo | |
import netifaces | |
from netaddr.ip import IPAddress | |
import dns.rdatatype | |
import dns.resolver | |
import dns.exception | |
token = 'xxxxxxxxxx' | |
domains = [ | |
'xxxx.dedyn.io', | |
'vpn.xxxx.dedyn.io', | |
'vpn6.xxxx.dedyn.io', | |
'plex.xxxx.dedyn.io', | |
'gitea.xxxx.dedyn.io', | |
'manga.xxxx.dedyn.io' | |
] | |
ipv4_hosts = ['xxxx.dedyn.io', 'vpn.xxxx.dedyn.io'] | |
ipv6_hosts = { | |
'vpn.xxxx.dedyn.io': '::90a7:4716:2b77:c019', | |
'vpn6.xxxx.dedyn.io': '::90a7:4716:2b77:c019', | |
'plex.xxxx.dedyn.io': '::42:c0ff:fea8:a15', | |
'gitea.xxxx.dedyn.io': '::42:c0ff:fea8:a2a', | |
'manga.xxxx.dedyn.io': '::42:c0ff:fea8:a11' | |
} | |
def get_ipv6_offline_of(prefix, domain): | |
return IPAddress(prefix + '::').__add__(IPAddress(ipv6_hosts[domain])).__str__() | |
def update_host(domain, ipv4, ipv6_prefix): | |
auth = (domain, token) | |
if (domain in ipv4_hosts) and (domain in ipv6_hosts): | |
requests.packages.urllib3.util.connection.HAS_IPV6 = True | |
ipv6 = get_ipv6_offline_of(ipv6_prefix, domain) | |
url = 'https://update6.dedyn.io' | |
params={'hostname': domain ,'myipv4': ipv4, 'myipv6': ipv6} | |
elif domain in ipv4_hosts: | |
requests.packages.urllib3.util.connection.HAS_IPV6 = False | |
url = 'https://update.dedyn.io' | |
params={'hostname': domain ,'myipv4': ipv4} | |
elif domain in ipv6_hosts: | |
requests.packages.urllib3.util.connection.HAS_IPV6 = True | |
ipv6 = get_ipv6_offline_of(ipv6_prefix, domain) | |
url = 'https://update6.dedyn.io' | |
params={'hostname': domain , 'myipv6': ipv6} | |
else: | |
pass | |
try: | |
result = requests.get(url, params = params, auth = auth, timeout=5.0) | |
except requests.exceptions.ConnectionError as ex: | |
return False | |
if result.text == 'good': | |
return True | |
else: | |
print(f'result of dns update {result}') | |
return False | |
def update_all_domains(ipv4, ipv6): | |
for d in domains: | |
if (d in ipv4_hosts) and (d in ipv6_hosts): | |
ipv6_addr = get_ipv6_offline_of(ipv6.prefix, d) | |
if not ipv4_info.check_dns(ipv4.ip, d) or not ipv6_info.check_dns(ipv6_addr, d): | |
if update_host(d, ipv4.ip, ipv6.prefix): | |
print(f'Updated {d} with {ipv4.ip} and {ipv6_addr}') | |
else: | |
print(f'Update {d} failed!') | |
time.sleep(65) | |
elif d in ipv4_hosts: | |
if not ipv4_info.check_dns(ipv4.ip, d): | |
if update_host(d, ipv4.ip, None): | |
print(f'Updated {d} with {ipv4.ip}') | |
else: | |
print(f'Update {d} failed!') | |
time.sleep(65) | |
elif d in ipv6_hosts: | |
ipv6_addr = get_ipv6_offline_of(ipv6.prefix, d) | |
if not ipv6_info.check_dns(ipv6_addr, d): | |
if update_host(d, None, ipv6.prefix): | |
print(f'Updated {d} with {ipv6_addr}') | |
else: | |
print(f'Update {d} failed!') | |
time.sleep(65) | |
else: | |
pass | |
def get_nslookup_old(domain): | |
command = ['nslookup', domain, 'ns1.desec.io'] | |
ip_addr = [] | |
pc = subprocess.run(command, capture_output=True, text=True, check=True) | |
if pc.returncode == 0: | |
if len(pc.stdout.splitlines()) < 4: | |
return None | |
lines = pc.stdout.splitlines() | |
lines.pop(0) | |
lines.pop(0) | |
lines.pop(0) | |
lines.pop(0) | |
for l in lines: | |
if l.startswith('Address'): | |
ip_addr.append(l.split()[1].strip()) | |
elif len(l) > 4: | |
ip_addr.append(l.strip()) | |
else: | |
pass | |
return ip_addr | |
else: | |
return None | |
def dns_lookup(input, type=0, timeout=5, server=['45.54.76.1', '157.53.224.1']): | |
"""Perform a simple DNS lookup, return results in a dictionary""" | |
ip_addr = [] | |
resolver = dns.resolver.Resolver() | |
resolver.timeout = float(timeout) | |
resolver.lifetime = float(timeout) | |
resolver.nameservers = server | |
if type == 0 or type == 4: | |
try: | |
records = resolver.resolve(qname=input, rdtype=dns.rdatatype.A) | |
result_a = { | |
"addrs": [ii.address for ii in records], | |
"error": "", | |
"name": input, | |
} | |
for ip in result_a['addrs']: | |
ip_addr.append(ip) | |
except dns.exception.DNSException as e: | |
pass | |
#print( { | |
# "addrs": [], | |
# "error": repr(e), | |
# "name": input, | |
#} ) | |
if type == 0 or type == 6: | |
try: | |
records = resolver.resolve(qname=input, rdtype=dns.rdatatype.AAAA) | |
result_aaaa = { | |
"addrs": [ii.address for ii in records], | |
"error": "", | |
"name": input, | |
} | |
for ip in result_aaaa['addrs']: | |
ip_addr.append(ip) | |
except dns.exception.DNSException as e: | |
pass | |
#print( { | |
# "addrs": [], | |
# "error": repr(e), | |
# "name": input, | |
#} ) | |
return ip_addr | |
class ipv4_info(): | |
def get_nslookup(domain): | |
return dns_lookup(input=domain, type=4) | |
def get_ipv4_online(): | |
try: | |
return requests.get("https://api.ipify.org", timeout=5).text | |
except requests.exceptions.ConnectionError as ex: | |
return None | |
def check_dns(ip_addr, domain): | |
result = ipv4_info.get_nslookup(domain) | |
if result == None: | |
return False | |
if ip_addr in result: | |
return True | |
else: | |
return False | |
def __init__(self) -> None: | |
self.ip = ipv4_info.get_ipv4_online() | |
class ipv6_info(): | |
def get_nslookup(domain): | |
return dns_lookup(input=domain, type=6) | |
def get_ipv6_online(): | |
try: | |
return requests.get("https://api6.ipify.org", timeout=5).text | |
except requests.exceptions.ConnectionError as ex: | |
return None | |
def check_dns(ip_addr, domain): | |
result = ipv6_info.get_nslookup(domain) | |
if result == None: | |
return False | |
if ip_addr in result: | |
return True | |
else: | |
return False | |
def __check_ipv6_offline(self, public_ipv6): | |
for ipv6address in self.ipv6_addresses: | |
if ipv6address['addr'] == public_ipv6: | |
return True | |
return False | |
def __get_link_lokal_address(self): | |
for ipv6address in self.ipv6_addresses: | |
if ipv6address['addr'].find('%') > 0: | |
return ipv6address['addr'].split('%')[0] | |
def __check_dns__(self, domain): | |
return self.public_ipv6 == ipv6_info.get_nslookup(domain) | |
def __init__(self) -> None: | |
self.status = False | |
self.default_gateway = netifaces.gateways()['default'][netifaces.AF_INET6][0] | |
self.default_interface = netifaces.gateways()['default'][netifaces.AF_INET6][1] | |
all_addresses = netifaces.ifaddresses(self.default_interface) | |
self.mac = all_addresses[netifaces.AF_LINK][0]['addr'] | |
self.ipv6_addresses = all_addresses[netifaces.AF_INET6] | |
self.lla_ipv6 = self.__get_link_lokal_address() | |
self.lla_ipv6_host = '::' + self.lla_ipv6.split('::')[1] | |
ipv6_online = ipv6_info.get_ipv6_online() | |
if ipv6_online == None: | |
print("ipv6_info.get_ipv6_online() returns None") | |
quit() | |
self.prefix = ipv6_online.split(':0:')[0] | |
public_ipv6 = IPAddress(self.prefix + '::').__add__(IPAddress(self.lla_ipv6_host)).__str__() | |
if self.__check_ipv6_offline(public_ipv6): | |
self.status = True | |
self.public_ipv6 = public_ipv6 | |
elif self.__check_ipv6_offline(ipv6_online): | |
self.status = True | |
self.public_ipv6 = ipv6_online | |
dt = datetime.now(ZoneInfo("Europe/Berlin")) | |
print(dt.strftime("%Y-%m-%d %H:%M:%S") + ' Starting update DynDNS') | |
ipv4 = ipv4_info() | |
if ipv4.ip == None: | |
print("ipv4_info.get_ipv4_online() return None") | |
quit() | |
ipv6 = ipv6_info() | |
if '-info' in argv: #if True:# | |
print("MAC {0}".format(ipv6.mac)) | |
print("IPv4 {0}".format(ipv4.ip)) | |
print("LLA {0}".format(ipv6.lla_ipv6)) | |
print("Gateway {0}".format(ipv6.default_gateway)) | |
print("Prefix {0}".format(ipv6.prefix)) | |
print("IPv6 {0}".format(ipv6.public_ipv6)) | |
print("IPv6 OK {0}".format(ipv6.status)) | |
for d in domains: | |
if d in ipv4_hosts: | |
print(f'{ipv4_info.check_dns(ipv4.ip, d).__str__():9} {d:25} {dns_lookup(d)}') | |
elif d in ipv6_hosts: | |
# using own ipv6 | |
#print("{0} {1} {2}".format(ipv6.__check_dns__(d), d, ipv6_info.get_nslookup(d) ) ) | |
# using ipv6_hosts | |
ip_addr = get_ipv6_offline_of(ipv6.prefix, d) | |
print(f'{ipv6_info.check_dns(ip_addr, d).__str__():9} {d:25} {dns_lookup(d)}') | |
else: | |
print(f'Domain {d} not found in hosts') | |
elif '-update' in argv: | |
update_all_domains(ipv4, ipv6) | |
print('Done') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment