Skip to content

Instantly share code, notes, and snippets.

@pavelrevak
Last active June 9, 2023 12:39
Show Gist options
  • Save pavelrevak/02df7cadbd37a41bf972b16a0f796625 to your computer and use it in GitHub Desktop.
Save pavelrevak/02df7cadbd37a41bf972b16a0f796625 to your computer and use it in GitHub Desktop.
Simple dynamic DNS updater, emulate dyndns and noip server API, call nsupdate, without special dependencies, only dnspython
"""Dynamic DNS updater
"""
import sys
import base64 as _base64
import argparse as _argparse
import json as _json
import logging as _logging
import http.server as _httpserver
# dnspython
import dns.resolver
import dns.tsigkeyring
import dns.update
import dns.query
"""
config.json example:
{
"name_server": "1.2.3.4",
"zone": "myzone.com",
"key": {
"myzone.com": [
"hmac-sha512",
"mysecretkey=="
]
},
"login": [
"login",
"password"
],
"host_name": "127.0.0.1",
"server_port": 6789
}
"""
_logging.basicConfig(format='%(levelname).1s: %(message)s (%(filename)s:%(lineno)s)')
log = _logging.getLogger('dyndnsupdate')
log.setLevel(_logging.DEBUG)
class DnsUpdateError(Exception):
pass
class DnsUpdate:
@staticmethod
def fix_name(name):
if name is None:
return name
if name.endswith('.'):
return name
return name + '.'
def __init__(self, key=None, zone=None, nameserver=None):
if key:
self._keyring = dns.tsigkeyring.from_text(key)
else:
self._keyring = None
self._zone = self.fix_name(zone)
if nameserver is None:
resolver = dns.resolver.Resolver(configure=True)
if not resolver.nameservers:
raise DnsUpdateError('No DNS server found.')
nameserver = resolver.nameservers[0]
else:
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = [nameserver]
self._resolver = resolver
self._nameserver = nameserver
def get_ip(self, host, entry='A'):
try:
result = self._resolver.resolve(host, entry)
except dns.exception.DNSException:
return None
return result[0].to_text()
def set_record(self, hostname, address, entry='A', ttl=300, nameserver=None):
if nameserver is None:
nameserver = self._nameserver
hostname = self.fix_name(hostname)
update = dns.update.Update(self._zone, keyring=self._keyring)
update.replace(hostname, ttl, entry, address)
response = dns.query.tcp(update, nameserver)
return response
def delete_record(self, hostname, entry='A', nameserver=None):
if nameserver is None:
nameserver = self._nameserver
hostname = self.fix_name(hostname)
update = dns.update.Update(self._zone, keyring=self._keyring)
update.delete(hostname, entry)
response = dns.query.tcp(update, nameserver)
return response
def validate_ip(address):
parts = address.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
num = int(part)
if not 0 <= num <= 255:
return False
return True
class MyServer(_httpserver.BaseHTTPRequestHandler):
def check_login(self, auth):
if not auth:
return False
if not auth.startswith('Basic '):
return False
auth = auth.lstrip('Basic ')
try:
auth = _base64.b64decode(auth)
except _base64.binascii.Error as err:
log.error(err)
return False
try:
auth = auth.decode('utf-8')
except UnicodeDecodeError as err:
log.error(err)
return False
if ':' not in auth:
return False
login = auth.split(':', 1)
return login == self.server.conf['login']
def nic_update(self, query):
if 'login' in self.server.conf:
if not self.check_login(self.headers.get('Authorization')):
return 401
system = query.get('system')
if system not in ('noip', 'dyndns'):
return 400
myip = query.get('myip')
hostname = query.get('hostname')
if not hostname:
return 400
if myip and not validate_ip(myip):
return 400
dns_update = DnsUpdate()
address = dns_update.get_ip(hostname)
if address == myip:
return 200
dns_update = DnsUpdate(
self.server.conf.get('key'),
self.server.conf.get('zone'),
self.server.conf.get('name_server'))
address = dns_update.get_ip(hostname)
if address == myip:
return 200
if myip:
log.info("Updating IP for host '%s' from '%s' to '%s'", hostname, address, myip)
dns_update.set_record(hostname, myip)
else:
log.info("Delete host '%s' '%s'", hostname, address)
dns_update.delete_record(hostname)
address = dns_update.get_ip(hostname)
if address != myip:
log.error("Error updating address")
return 500
return 200
def do_GET(self):
path = self.path
query = {}
if '?' in self.path:
path, query = self.path.split('?', 1)
query = dict(tuple(q.split('=', 1)) for q in query.split('&'))
result = 404
if path == '/nic/update' and query:
result = self.nic_update(query)
self.send_response(result)
self.send_header('Connection', 'close')
self.end_headers()
def main():
"""Main"""
parser = _argparse.ArgumentParser(
formatter_class=_argparse.RawTextHelpFormatter)
parser.add_argument(
'-c', '--config', type=str, required=True,
help='configuration file')
args = parser.parse_args()
conf = {}
with open(args.config, 'r', encoding='utf-8') as config_file:
conf = _json.loads(config_file.read())
if not conf:
log.info("No configuration")
return
web_server = _httpserver.HTTPServer(
(conf['host_name'], conf['server_port']),
MyServer)
web_server.conf = conf
log.info("Server started http://%s:%s", conf['host_name'], conf['server_port'])
try:
web_server.serve_forever()
except KeyboardInterrupt:
pass
web_server.server_close()
log.info("Server stopped.")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment