Skip to content

Instantly share code, notes, and snippets.

@Kedstar99
Created October 25, 2021 15:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Kedstar99/2d8ab0e9bec3a5629562b4ab010a1847 to your computer and use it in GitHub Desktop.
Save Kedstar99/2d8ab0e9bec3a5629562b4ab010a1847 to your computer and use it in GitHub Desktop.
cloudflare-ddns
#!/bin/env python3
import requests
import json
import time
import sys
import ipaddress
import logging
from systemd.journal import JournalHandler
log = logging.getLogger('demo')
log.addHandler(JournalHandler())
log.setLevel(logging.INFO)
dns_cloudflare_email = 'redacted'
dns_cloudflare_api_key = 'redacted'
domain = 'redacted'
# your NIC name like eth0, enp30s0. Usually it's fine to leave it empty.
nic_device = ''
def _set_cloudflare_auth_headers():
s.headers['X-Auth-Email'] = dns_cloudflare_email
s.headers['X-Auth-Key'] = dns_cloudflare_api_key
return
# global requests session for a keep-alive connection.
s = requests.Session()
_set_cloudflare_auth_headers()
def get_ip_addr():
def validate_resp(url, version):
try:
addr = requests.get(url)
if addr.status_code == 200:
res = ipaddress.ip_address(addr.content.strip().decode('ascii'))
if res.version == version:
return res
except Exception:
log.error('url parsing failed for {0}'.format(url))
return None
addrs = {
'4': (validate_resp(url, 4) for url in ['http://ipv4.icanhazip.com', 'https://api.ipify.org', 'https://ipv4bot.whatismyipaddress.com']),
'6': (validate_resp(url, 6) for url in ['http://ipv6.icanhazip.com', 'https://api6.ipify.org', 'https://ipv6bot.whatismyipaddress.com'])
}
v4 = next((ip for ip in addrs['4'] if ip is not None), None)
v6 = next((ip for ip in addrs['6'] if ip is not None), None)
log.info('Public ip identified as {v4}, {v6}'.format(v4=v4, v6=v6))
return (v4, v6)
def _get_cloudflare_results(r:requests.Response) -> list:
r.raise_for_status()
return r.json().get('result', [])
def _get_cloudflare_result(r:requests.Response) -> dict:
res = _get_cloudflare_results(r)
assert len(res) == 1, 'there should be exactly one result'
return res[0]
def fetch_cloudflare_account():
r = s.get('https://api.cloudflare.com/client/v4/accounts')
res = _get_cloudflare_result(r)
account_id, account_name = res.get('id'), res.get('name')
assert account_id is not None and account_name is not None
return account_id, account_name
def fetch_cloundflare_zone_id(account_id:str, account_name:str, domain:str) -> str:
root_domain = domain.rsplit('.', 2)[-2:]
assert len(root_domain)==2
params = {
'name': '.'.join(root_domain),
'status': 'active',
'X-Auth-Key': account_id,
'X-Auth-Email': account_name,
}
r = s.get('https://api.cloudflare.com/client/v4/zones', params=params)
res = _get_cloudflare_result(r)
zone_id = res.get('id')
assert zone_id is not None
return zone_id
def fetch_cloundflare_dns(zone_id:str, domain:str, record_type:str = 'AAAA'):
url = 'https://api.cloudflare.com/client/v4/zones/{}/dns_records'.format(zone_id)
params = {'name': domain}
r = s.get(url, params=params)
res = _get_cloudflare_results(r)
dns_id, addr = '', ''
log.info('Effected DNS records:')
for rc in res:
log.info('{}\t{}\t{}\t{}'.format(
rc.get('name'), rc.get('type'), rc.get('content'), rc.get('modified_on'),
))
if rc.get('type') == record_type:
dns_id, addr = rc.get('id'), rc.get('content')
return dns_id, addr
def refresh_cloudflare_dns(account_id:str, account_name:str, domain:str, addr:str, record_type:str = 'AAAA'):
zone_id = fetch_cloundflare_zone_id(account_id, account_name, domain)
dns_id, effected_addr = fetch_cloundflare_dns(zone_id, domain, record_type)
if effected_addr == addr:
log.info('No need to update.')
elif dns_id == '':
log.info('There is no {} record for {} yet. Create one.'.format(record_type, domain))
url = 'https://api.cloudflare.com/client/v4/zones/{}/dns_records'.format(zone_id)
data = {'type': record_type, 'name': domain, 'content': str(addr), 'ttl': 1}
r = s.post(url, json=data)
r.raise_for_status()
else:
url = 'https://api.cloudflare.com/client/v4/zones/{}/dns_records/{}'.format(zone_id, dns_id)
data = {'type': record_type, 'name': domain, 'content': str(addr)}
r = s.patch(url, json=data)
r.raise_for_status()
return
def refresh_ip(ip, old_ip, is_ipv4):
if ip is None:
log.debug("Issue with local internet connection")
return 10 * 60
expected_version = 4 if is_ipv4 else 6
expected_record = 'A' if is_ipv4 else 'AAAA'
if ip.version != expected_version:
log.debug("Ip gathered is not a correct version : {0}, expected : {1}".format(ip, expected_version))
return 10 * 60
if old_ip != ip:
log.info('Detect local IP changed: from "{}" to "{}"'.format(old_ip, ip))
log.info('Refresh remote DNS record.')
try:
account_id, account_name = fetch_cloudflare_account()
refresh_cloudflare_dns(account_id, account_name, domain, ip, record_type=expected_record)
return 60 * 60
except requests.HTTPError as e:
log.info('{}\n{}'.format(e, e.response.json()))
return 10 * 60
except Exception as e:
print(e)
sys.exit(1)
else:
return 60 * 60
if __name__ == "__main__":
last_ip_addrs = ('', '')
delay = 60 * 30
while True:
log.info("fetching fresh local ips")
curr_ips = get_ip_addr()
delayv6 = refresh_ip(curr_ips[1], last_ip_addrs[1], False)
delayv4 = refresh_ip(curr_ips[0], last_ip_addrs[0], True)
time.sleep(max(delayv4, delayv6))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment