Skip to content

Instantly share code, notes, and snippets.

@Caligatio
Last active March 29, 2023 12:47
Show Gist options
  • Save Caligatio/b975005f26248575aba4709d093cfe4f to your computer and use it in GitHub Desktop.
Save Caligatio/b975005f26248575aba4709d093cfe4f to your computer and use it in GitHub Desktop.
Dynamic DNS script to detect and update A, AAAA, and HTTPS (with ipv4hint and ipv6hint) records at Cloudflare
#!/usr/bin/python3
"""
dyndns_cloudflare
This is a Python 3.8+ script that will attempt to identify a computer's external IPv4 and "static" (non RFC 4941) IPv6
addresses to then update Cloudflare DNS entries. It currently supports A, AAAA, and the new HTTPS DNS record types. For
HTTPS records, it will embed the "ipv4hints" and "ipv6hints" values into the record along with the ALPN specified in the
config. It has optional support for saving the IPv4 and IPv6 addresses from the last run of the program to avoid
unnecessary updates.
**Note that it only updates DNS records: the records must already exist and it will not delete any records. If you add more
records to the config, make sure you delete the "last_run" file as there is no logic to detect unchanging IPs but a new
config.**
It was written by Brian Turek (https://github.com/Caligatio) and released under the Unlicense (https://unlicense.org/).
"""
import argparse
import ipaddress
import logging
import pathlib
import subprocess
import sys
from typing import Dict, Final, Iterable, Literal, Optional
import xml.etree.ElementTree as ET
from yaml import load, dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
import requests
logger = logging.getLogger(__name__)
def get_ipv4_by_fritz_upnp(hostname: str = "fritz.box", port: int = 49000) -> Optional[ipaddress.IPv4Address]:
# This is some black magic copied from https://wiki.ubuntuusers.de/FritzBox/Skripte/
try:
req = requests.post(
f"http://{hostname}:{port}/igdupnp/control/WANIPConn1",
headers={
"Content-Type": 'text/xml; charset="utf-8"',
"SoapAction": "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress",
},
data=b"""
<?xml version='1.0' encoding='utf-8'?>
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'>
<s:Body>
<u:GetExternalIPAddress xmlns:u='urn:schemas-upnp-org:service:WANIPConnection:1' />
</s:Body>
</s:Envelope>
""",
)
req.raise_for_status()
except requests.HTTPError:
logger.debug("Failed to find IPv4 address via Fritz UPnP")
return None
root = ET.fromstring(req.content)
addr = ipaddress.IPv4Address(root.findall(".//NewExternalIPAddress")[0].text)
logger.debug("Found IPv4 address via Fritz UPnP")
return addr
def get_ipv4_by_ipify() -> Optional[ipaddress.IPv4Address]:
try:
req = requests.get("http://api.ipify.org")
req.raise_for_status()
except requests.HTTPError:
logger.debug("Failed to find IPv4 address via ipify")
return None
addr = ipaddress.IPv4Address(req.content.decode())
logger.debug("Found IPv4 address via ipify")
return addr
def get_static_ipv6(iface: str) -> Optional[ipaddress.IPv6Address]:
cmd = subprocess.run(["/sbin/ip", "addr", "show", "dev", iface], capture_output=True, encoding="utf-8")
for line in cmd.stdout.splitlines():
if "scope global" in line and "mngtmpaddr" in line and "deprecated" not in line:
parts = line.split()
addr_part, _ = parts[1].split("/")
addr = ipaddress.IPv6Address(addr_part)
logger.debug("Found IPv6 address via static interface")
return addr
logger.debug("Failed to find IPv6 address via static interface")
return None
def update_cloudflare_a_aaaa_record(
api_token: str,
zone_id: str,
record_id: str,
record_type: Literal["A", "AAAA"],
name: str,
content: str,
proxied: bool,
ttl: int = 1,
) -> None:
logger.info(
"Setting %s record for %s to %s with ttl %d and proxy flag set to %s", record_type, name, content, ttl, proxied
)
req = requests.put(
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}",
headers={
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
},
json={"type": record_type, "name": name, "content": content, "ttl": ttl, "proxied": proxied},
)
req.raise_for_status()
def update_cloudflare_https_record(
api_token: str,
zone_id: str,
record_id: str,
name: str,
target: str,
value: str,
priority: int,
ttl: int = 1,
) -> None:
logger.info(
"Setting HTTPS record for %s to %s with value '%s', ttl %d, and priority %d", name, target, value, ttl, priority
)
req = requests.put(
f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}",
headers={
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
},
json={
"type": "HTTPS",
"name": name,
"ttl": ttl,
"data": {"target": target, "value": value, "priority": priority},
},
)
req.raise_for_status()
def main(
api_token: str,
zone_id: str,
ipv4_methods: Iterable[Dict],
ipv6_methods: Iterable[Dict],
a_records: Iterable[Dict],
aaaa_records: Iterable[Dict],
https_records: Iterable[Dict],
last_run: Optional[pathlib.Path] = None,
) -> int:
METHODS_MAPPING: Final = {
"fritz_upnp": get_ipv4_by_fritz_upnp,
"ipify": get_ipv4_by_ipify,
"static_iface": get_static_ipv6,
}
last_ips = {"ipv4": None, "ipv6": None}
if last_run:
try:
with last_run.open("r") as f_in:
data = load(f_in, Loader=Loader)
last_ips = {
"ipv4": ipaddress.IPv4Address(data["ipv4"]) if data["ipv4"] else None,
"ipv6": ipaddress.IPv6Address(data["ipv6"]) if data["ipv6"] else None,
}
logger.info("Last ran found IPv4 of %s and IPv6 of %s", last_ips["ipv4"], last_ips["ipv6"])
except FileNotFoundError:
pass
if not any([ipv4_methods, ipv6_methods]):
logger.critial("At least one of ipv4_methods or ipv6_methods must be defined")
return 1
if not any([a_records, aaaa_records, https_records]):
logger.critical("At least one of A, AAAA, or HTTPS records must be defined")
return 2
addrs = {"ipv4": None, "ipv6": None}
for addr_type, methods in (("ipv4", ipv4_methods), ("ipv6", ipv6_methods)):
if methods:
for method in methods:
method_name = method.pop("name")
addr = METHODS_MAPPING[method_name](**method)
if addr:
addrs[addr_type] = addr
logger.info("Found %s address of %s", addr_type, addr)
break
else:
logger.critical("All %s address methods failed", addr_type)
return 3
if addrs["ipv4"] and last_ips["ipv4"] != addrs["ipv4"]:
logger.info("IPv4 address changed!")
for record in a_records:
update_cloudflare_a_aaaa_record(
api_token,
zone_id,
record["id"],
"A",
record["name"],
str(addrs["ipv4"]),
record["proxied"],
record.get("ttl", 1),
)
else:
logger.info("No IPv4 change")
if addrs["ipv6"] and last_ips["ipv6"] != addrs["ipv6"]:
logger.info("IPv6 address changed!")
for record in aaaa_records:
update_cloudflare_a_aaaa_record(
api_token,
zone_id,
record["id"],
"AAAA",
record["name"],
str(addrs["ipv6"]),
record["proxied"],
record.get("ttl", 1),
)
else:
logger.info("No IPv6 change")
if last_ips["ipv4"] != addrs["ipv4"] or last_ips["ipv6"] != addrs["ipv6"]:
for record in https_records:
contents = [f'alpn="{record["alpn"]}"']
for addr_type, addr in addrs.items():
if addr:
contents.append(f'{addr_type}hint="{addr}"')
update_cloudflare_https_record(
api_token,
zone_id,
record["id"],
record["name"],
record["target"],
" ".join(contents),
record.get("priority", 1),
record.get("ttl", 1),
)
if last_run:
with last_run.open("w") as f_out:
f_out.write(dump({"ipv4": str(addrs["ipv4"]), "ipv6": str(addrs["ipv6"])}, Dumper=Dumper))
return 0
def use_config(config_file: pathlib.Path) -> int:
with config_file.open("rb") as f_in:
config = load(f_in, Loader=Loader)
last_run = config.get("last_run")
if last_run:
last_run = pathlib.Path(last_run)
return main(
config["api_token"],
config["zone_id"],
config.get("ipv4_methods", []),
config.get("ipv6_methods", []),
config["records"].get("A", []),
config["records"].get("AAAA", []),
config["records"].get("HTTPS", []),
last_run,
)
def cli() -> None:
parser = argparse.ArgumentParser(description="Updates dynamic DNS entries at Cloudflare")
parser.add_argument("--config", required=True, type=pathlib.Path, help="Path to configuration file")
parser.add_argument(
"--logging",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
help="Desired logging level",
)
args = parser.parse_args()
log_level = getattr(logging, args.logging)
logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s")
logging.getLogger("urllib3").setLevel(max(log_level, logging.INFO))
sys.exit(use_config(args.config))
if __name__ == "__main__":
cli()
---
api_token: REDACTED_API_TOKEN
zone_id: REDACTED_ZONE_ID
ipv4_methods:
- name: fritz_upnp
hostname: 192.168.178.1
- type: ipify
ipv6_methods:
- name: static_iface
iface: enp1s0
last_run: /var/run/dyndns_cloudflare
records:
A:
- name: srv.example.com
id: REDACTED_RECORD_ID
proxied: false
AAAA:
- name: srv.example.com
id: REDACTED_RECORD_ID
proxied: false
HTTPS:
- name: test1.example.com
id: REDACTED_RECORD_ID
target: srv.example.com
alpn: h3,h2
- name: test2.example.com
id: REDACTED_RECORD_ID
target: srv.example.com
alpn: h3,h2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment