Last active
June 9, 2023 12:39
-
-
Save pavelrevak/02df7cadbd37a41bf972b16a0f796625 to your computer and use it in GitHub Desktop.
Simple dynamic DNS updater, emulate dyndns and noip server API, call nsupdate, without special dependencies, only dnspython
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Dynamic DNS updater | |
""" | |
import sys | |
import base64 as _base64 | |
import argparse as _argparse | |
import json as _json | |
import logging as _logging | |
import http.server as _httpserver | |
# dnspython | |
import dns.resolver | |
import dns.tsigkeyring | |
import dns.update | |
import dns.query | |
""" | |
config.json example: | |
{ | |
"name_server": "1.2.3.4", | |
"zone": "myzone.com", | |
"key": { | |
"myzone.com": [ | |
"hmac-sha512", | |
"mysecretkey==" | |
] | |
}, | |
"login": [ | |
"login", | |
"password" | |
], | |
"host_name": "127.0.0.1", | |
"server_port": 6789 | |
} | |
""" | |
_logging.basicConfig(format='%(levelname).1s: %(message)s (%(filename)s:%(lineno)s)') | |
log = _logging.getLogger('dyndnsupdate') | |
log.setLevel(_logging.DEBUG) | |
class DnsUpdateError(Exception): | |
pass | |
class DnsUpdate: | |
@staticmethod | |
def fix_name(name): | |
if name is None: | |
return name | |
if name.endswith('.'): | |
return name | |
return name + '.' | |
def __init__(self, key=None, zone=None, nameserver=None): | |
if key: | |
self._keyring = dns.tsigkeyring.from_text(key) | |
else: | |
self._keyring = None | |
self._zone = self.fix_name(zone) | |
if nameserver is None: | |
resolver = dns.resolver.Resolver(configure=True) | |
if not resolver.nameservers: | |
raise DnsUpdateError('No DNS server found.') | |
nameserver = resolver.nameservers[0] | |
else: | |
resolver = dns.resolver.Resolver(configure=False) | |
resolver.nameservers = [nameserver] | |
self._resolver = resolver | |
self._nameserver = nameserver | |
def get_ip(self, host, entry='A'): | |
try: | |
result = self._resolver.resolve(host, entry) | |
except dns.exception.DNSException: | |
return None | |
return result[0].to_text() | |
def set_record(self, hostname, address, entry='A', ttl=300, nameserver=None): | |
if nameserver is None: | |
nameserver = self._nameserver | |
hostname = self.fix_name(hostname) | |
update = dns.update.Update(self._zone, keyring=self._keyring) | |
update.replace(hostname, ttl, entry, address) | |
response = dns.query.tcp(update, nameserver) | |
return response | |
def delete_record(self, hostname, entry='A', nameserver=None): | |
if nameserver is None: | |
nameserver = self._nameserver | |
hostname = self.fix_name(hostname) | |
update = dns.update.Update(self._zone, keyring=self._keyring) | |
update.delete(hostname, entry) | |
response = dns.query.tcp(update, nameserver) | |
return response | |
def validate_ip(address): | |
parts = address.split('.') | |
if len(parts) != 4: | |
return False | |
for part in parts: | |
if not part.isdigit(): | |
return False | |
num = int(part) | |
if not 0 <= num <= 255: | |
return False | |
return True | |
class MyServer(_httpserver.BaseHTTPRequestHandler): | |
def check_login(self, auth): | |
if not auth: | |
return False | |
if not auth.startswith('Basic '): | |
return False | |
auth = auth.lstrip('Basic ') | |
try: | |
auth = _base64.b64decode(auth) | |
except _base64.binascii.Error as err: | |
log.error(err) | |
return False | |
try: | |
auth = auth.decode('utf-8') | |
except UnicodeDecodeError as err: | |
log.error(err) | |
return False | |
if ':' not in auth: | |
return False | |
login = auth.split(':', 1) | |
return login == self.server.conf['login'] | |
def nic_update(self, query): | |
if 'login' in self.server.conf: | |
if not self.check_login(self.headers.get('Authorization')): | |
return 401 | |
system = query.get('system') | |
if system not in ('noip', 'dyndns'): | |
return 400 | |
myip = query.get('myip') | |
hostname = query.get('hostname') | |
if not hostname: | |
return 400 | |
if myip and not validate_ip(myip): | |
return 400 | |
dns_update = DnsUpdate() | |
address = dns_update.get_ip(hostname) | |
if address == myip: | |
return 200 | |
dns_update = DnsUpdate( | |
self.server.conf.get('key'), | |
self.server.conf.get('zone'), | |
self.server.conf.get('name_server')) | |
address = dns_update.get_ip(hostname) | |
if address == myip: | |
return 200 | |
if myip: | |
log.info("Updating IP for host '%s' from '%s' to '%s'", hostname, address, myip) | |
dns_update.set_record(hostname, myip) | |
else: | |
log.info("Delete host '%s' '%s'", hostname, address) | |
dns_update.delete_record(hostname) | |
address = dns_update.get_ip(hostname) | |
if address != myip: | |
log.error("Error updating address") | |
return 500 | |
return 200 | |
def do_GET(self): | |
path = self.path | |
query = {} | |
if '?' in self.path: | |
path, query = self.path.split('?', 1) | |
query = dict(tuple(q.split('=', 1)) for q in query.split('&')) | |
result = 404 | |
if path == '/nic/update' and query: | |
result = self.nic_update(query) | |
self.send_response(result) | |
self.send_header('Connection', 'close') | |
self.end_headers() | |
def main(): | |
"""Main""" | |
parser = _argparse.ArgumentParser( | |
formatter_class=_argparse.RawTextHelpFormatter) | |
parser.add_argument( | |
'-c', '--config', type=str, required=True, | |
help='configuration file') | |
args = parser.parse_args() | |
conf = {} | |
with open(args.config, 'r', encoding='utf-8') as config_file: | |
conf = _json.loads(config_file.read()) | |
if not conf: | |
log.info("No configuration") | |
return | |
web_server = _httpserver.HTTPServer( | |
(conf['host_name'], conf['server_port']), | |
MyServer) | |
web_server.conf = conf | |
log.info("Server started http://%s:%s", conf['host_name'], conf['server_port']) | |
try: | |
web_server.serve_forever() | |
except KeyboardInterrupt: | |
pass | |
web_server.server_close() | |
log.info("Server stopped.") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment