Created
March 26, 2020 08:12
-
-
Save return0927/c6a64c9534c7020d41bda2baaa219095 to your computer and use it in GitHub Desktop.
Update Cloudflare A record with external IP. Use this with crontab(Linux/Unix) or Task Scheduler (in Windows)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Cloudflare DDNS python script | |
by Eunhak Lee (return0927) in 2020 | |
""" | |
import json | |
import logging | |
import urllib.request as request | |
import urllib.error | |
LOGGING_LEVEL = "DEBUG" | |
ACCOUNT_EMAIL = "" | |
ACCOUNT_TOKEN = "" | |
ZONE_NAME = "" # Domain name | |
RECORD_NAME = "" # Record name as full domain | |
API_HOST = "https://api.cloudflare.com/client/v4" | |
IP_RESOLVER_HOST = "http://ipinfo.io/ip" | |
# Set logger | |
log = logging.getLogger("cloudflare_ddns") | |
log.setLevel(LOGGING_LEVEL) | |
formatter = logging.Formatter("[%(asctime)s][%(levelname)s] L%(lineno)d > %(msg)s") | |
stream_handler = logging.StreamHandler() | |
stream_handler.setFormatter(formatter) | |
log.addHandler(stream_handler) | |
file_handler = logging.FileHandler("cloudflare-ddns.log") | |
file_handler.setFormatter(formatter) | |
log.addHandler(file_handler) | |
def get_ip(): | |
req = request.Request(IP_RESOLVER_HOST) | |
log.info("Resolving IP") | |
log.debug(f"Resolving IP host -> {req.full_url}") | |
resp = request.urlopen(req) | |
resp_data = resp.read() | |
log.debug(f"Bare IP Response: {resp_data}") | |
bare_ip = resp_data.decode().strip() | |
log.info(f"Resolved IP: {bare_ip}") | |
return bare_ip | |
def resolve_zone(): | |
log.info("Resolving Zone ID") | |
req = request.Request( | |
f"{API_HOST}/zones?name={ZONE_NAME}", | |
headers={ | |
"Authorization": f"Bearer {ACCOUNT_TOKEN}", | |
"X-Auth-Email": ACCOUNT_EMAIL, | |
"Content-Type": "application/json" | |
}) | |
log.debug(f"Resolving Zone ID -> {req.full_url}") | |
resp = request.urlopen(req) | |
resp_data = resp.read() | |
log.debug(f"Bare response -> {resp_data}") | |
json_data = json.loads(resp_data.decode()) | |
data = json_data["result"][0] | |
assert data["name"] == ZONE_NAME # check same zone found | |
zone_id = data["id"] | |
log.info(f"Resolved Zone Identifier: '{zone_id}'") | |
return zone_id | |
def resolve_record(zone_id: str): | |
log.info(f"Resolving record with zone id '{zone_id}'") | |
req = request.Request( | |
f"{API_HOST}/zones/{zone_id}/dns_records?name={RECORD_NAME}", | |
headers={ | |
"Authorization": f"Bearer {ACCOUNT_TOKEN}", | |
"X-Auth-Email": ACCOUNT_EMAIL, | |
"Content-Type": "application/json" | |
}) | |
log.debug(f"Resolving Record ID -> {req.full_url}") | |
resp = request.urlopen(req) | |
resp_data = resp.read() | |
log.debug(f"Bare response -> {resp_data}") | |
json_data = json.loads(resp_data.decode()) | |
data = json_data["result"][0] | |
assert data["name"] == RECORD_NAME # check same record found | |
record_id = data["id"] | |
record_content = data["content"] | |
log.info(f"Resolved Record Identifier: '{record_id}'") | |
log.info(f"Resolved Record IP: '{record_content}'") | |
return record_id, record_content | |
def set_ip(zone_id: str, record_id: str, ip: str): | |
log.info(f"Updating IP as '{ip}'") | |
req = request.Request( | |
f"{API_HOST}/zones/{zone_id}/dns_records/{record_id}", | |
method="PUT", | |
data=json.dumps({ | |
"name": RECORD_NAME, | |
"type": "A", | |
"content": ip | |
}).encode(), | |
headers={ | |
"Authorization": f"Bearer {ACCOUNT_TOKEN}", | |
"X-Auth-Email": ACCOUNT_EMAIL, | |
"Content-Type": "application/json" | |
}) | |
log.debug(f"Updating IP host -> {req.full_url}") | |
resp = request.urlopen(req) | |
resp_data = resp.read() | |
log.debug(f"Bare response -> {resp_data}") | |
def main(): | |
try: | |
ip = get_ip() | |
zone_id = resolve_zone() | |
record_id, record_ip = resolve_record(zone_id) | |
if ip != record_ip: | |
log.info(f"Update IP {record_ip} -> {ip}") | |
set_ip(zone_id, record_id, ip) | |
except urllib.error.HTTPError as e: | |
error_reason = f"{e.code} {e.reason}" | |
resp_data = e.read().decode() | |
log.error(f"Error on fetching http with reason '{error_reason}' -> {resp_data}") | |
except urllib.error.URLError as e: | |
error_reason = f"{e.reason}" | |
log.error(f"Error on URL with reason '{error_reason}'") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment