Created
October 25, 2021 15:13
-
-
Save Kedstar99/2d8ab0e9bec3a5629562b4ab010a1847 to your computer and use it in GitHub Desktop.
cloudflare-ddns
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import requests | |
import json | |
import time | |
import sys | |
import ipaddress | |
import logging | |
from systemd.journal import JournalHandler | |
log = logging.getLogger('demo') | |
log.addHandler(JournalHandler()) | |
log.setLevel(logging.INFO) | |
dns_cloudflare_email = 'redacted' | |
dns_cloudflare_api_key = 'redacted' | |
domain = 'redacted' | |
# your NIC name like eth0, enp30s0. Usually it's fine to leave it empty. | |
nic_device = '' | |
def _set_cloudflare_auth_headers(): | |
s.headers['X-Auth-Email'] = dns_cloudflare_email | |
s.headers['X-Auth-Key'] = dns_cloudflare_api_key | |
return | |
# global requests session for a keep-alive connection. | |
s = requests.Session() | |
_set_cloudflare_auth_headers() | |
def get_ip_addr(): | |
def validate_resp(url, version): | |
try: | |
addr = requests.get(url) | |
if addr.status_code == 200: | |
res = ipaddress.ip_address(addr.content.strip().decode('ascii')) | |
if res.version == version: | |
return res | |
except Exception: | |
log.error('url parsing failed for {0}'.format(url)) | |
return None | |
addrs = { | |
'4': (validate_resp(url, 4) for url in ['http://ipv4.icanhazip.com', 'https://api.ipify.org', 'https://ipv4bot.whatismyipaddress.com']), | |
'6': (validate_resp(url, 6) for url in ['http://ipv6.icanhazip.com', 'https://api6.ipify.org', 'https://ipv6bot.whatismyipaddress.com']) | |
} | |
v4 = next((ip for ip in addrs['4'] if ip is not None), None) | |
v6 = next((ip for ip in addrs['6'] if ip is not None), None) | |
log.info('Public ip identified as {v4}, {v6}'.format(v4=v4, v6=v6)) | |
return (v4, v6) | |
def _get_cloudflare_results(r:requests.Response) -> list: | |
r.raise_for_status() | |
return r.json().get('result', []) | |
def _get_cloudflare_result(r:requests.Response) -> dict: | |
res = _get_cloudflare_results(r) | |
assert len(res) == 1, 'there should be exactly one result' | |
return res[0] | |
def fetch_cloudflare_account(): | |
r = s.get('https://api.cloudflare.com/client/v4/accounts') | |
res = _get_cloudflare_result(r) | |
account_id, account_name = res.get('id'), res.get('name') | |
assert account_id is not None and account_name is not None | |
return account_id, account_name | |
def fetch_cloundflare_zone_id(account_id:str, account_name:str, domain:str) -> str: | |
root_domain = domain.rsplit('.', 2)[-2:] | |
assert len(root_domain)==2 | |
params = { | |
'name': '.'.join(root_domain), | |
'status': 'active', | |
'X-Auth-Key': account_id, | |
'X-Auth-Email': account_name, | |
} | |
r = s.get('https://api.cloudflare.com/client/v4/zones', params=params) | |
res = _get_cloudflare_result(r) | |
zone_id = res.get('id') | |
assert zone_id is not None | |
return zone_id | |
def fetch_cloundflare_dns(zone_id:str, domain:str, record_type:str = 'AAAA'): | |
url = 'https://api.cloudflare.com/client/v4/zones/{}/dns_records'.format(zone_id) | |
params = {'name': domain} | |
r = s.get(url, params=params) | |
res = _get_cloudflare_results(r) | |
dns_id, addr = '', '' | |
log.info('Effected DNS records:') | |
for rc in res: | |
log.info('{}\t{}\t{}\t{}'.format( | |
rc.get('name'), rc.get('type'), rc.get('content'), rc.get('modified_on'), | |
)) | |
if rc.get('type') == record_type: | |
dns_id, addr = rc.get('id'), rc.get('content') | |
return dns_id, addr | |
def refresh_cloudflare_dns(account_id:str, account_name:str, domain:str, addr:str, record_type:str = 'AAAA'): | |
zone_id = fetch_cloundflare_zone_id(account_id, account_name, domain) | |
dns_id, effected_addr = fetch_cloundflare_dns(zone_id, domain, record_type) | |
if effected_addr == addr: | |
log.info('No need to update.') | |
elif dns_id == '': | |
log.info('There is no {} record for {} yet. Create one.'.format(record_type, domain)) | |
url = 'https://api.cloudflare.com/client/v4/zones/{}/dns_records'.format(zone_id) | |
data = {'type': record_type, 'name': domain, 'content': str(addr), 'ttl': 1} | |
r = s.post(url, json=data) | |
r.raise_for_status() | |
else: | |
url = 'https://api.cloudflare.com/client/v4/zones/{}/dns_records/{}'.format(zone_id, dns_id) | |
data = {'type': record_type, 'name': domain, 'content': str(addr)} | |
r = s.patch(url, json=data) | |
r.raise_for_status() | |
return | |
def refresh_ip(ip, old_ip, is_ipv4): | |
if ip is None: | |
log.debug("Issue with local internet connection") | |
return 10 * 60 | |
expected_version = 4 if is_ipv4 else 6 | |
expected_record = 'A' if is_ipv4 else 'AAAA' | |
if ip.version != expected_version: | |
log.debug("Ip gathered is not a correct version : {0}, expected : {1}".format(ip, expected_version)) | |
return 10 * 60 | |
if old_ip != ip: | |
log.info('Detect local IP changed: from "{}" to "{}"'.format(old_ip, ip)) | |
log.info('Refresh remote DNS record.') | |
try: | |
account_id, account_name = fetch_cloudflare_account() | |
refresh_cloudflare_dns(account_id, account_name, domain, ip, record_type=expected_record) | |
return 60 * 60 | |
except requests.HTTPError as e: | |
log.info('{}\n{}'.format(e, e.response.json())) | |
return 10 * 60 | |
except Exception as e: | |
print(e) | |
sys.exit(1) | |
else: | |
return 60 * 60 | |
if __name__ == "__main__": | |
last_ip_addrs = ('', '') | |
delay = 60 * 30 | |
while True: | |
log.info("fetching fresh local ips") | |
curr_ips = get_ip_addr() | |
delayv6 = refresh_ip(curr_ips[1], last_ip_addrs[1], False) | |
delayv4 = refresh_ip(curr_ips[0], last_ip_addrs[0], True) | |
time.sleep(max(delayv4, delayv6)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment