Skip to content

Instantly share code, notes, and snippets.

@synio-wesley
Last active April 9, 2023 22:34
Show Gist options
  • Save synio-wesley/171f1716f487126568f168a399740e20 to your computer and use it in GitHub Desktop.
Save synio-wesley/171f1716f487126568f168a399740e20 to your computer and use it in GitHub Desktop.
Sync CloudFlare DNS based on JSON file
"""
This is a simple Python 3 script to sync all CloudFlare DNS records based on a JSON file.
Basically:
- It adds new domains if they aren't in CloudFlare yet
- For every domain: new DNS records will be added, existing records will be updated and records not existing in the JSON will be deleted
By default the script only does a dry run (simulation). Run the script with parameter "execute" to execute it.
If you only want to add/update some domains, fill in the "only" key in the JSON file with an array of domains you want to add/update.
It can save time if you have a lot of domains in the JSON file.
All common record types should be supported (A, AAAA, CNAME, MX, TXT/SPF, SRV).
You can create DNS templates for your records, and also define overrides/overwrites based on those templates for some domains.
An example JSON can be viewed here: https://gist.github.com/synio-wesley/ffd2ceb0f5abec10e9b865f6420ca804
A few small limitations currently:
- Filename needs to be domains.json in same directory (can of course be modified easily to use a parameter if you want)
- SPF records need to be defined as SPF instead of TXT in the JSON
- Records that have same type and name (for example multiple MX records) will all be updated simultaneously if one is changed
- Records that have same type and name (for example multiple MX records) need to be sorted on priority and/or value in the JSON file, otherwise they will be updated all the time
"""
CF_EMAIL = "?"
CF_APIKEY = "?"
CF_ACCOUNTID = "?"
CF_API_URL = "https://api.cloudflare.com/client/v4/"
CF_API_HEADERS = {
'X-Auth-Email': CF_EMAIL,
'X-Auth-Key': CF_APIKEY,
'Content-Type': 'application/json'
}
import json
import requests
import sys
from copy import deepcopy
from itertools import zip_longest
class DictDiffer(object):
"""
Calculate the difference between two dictionaries as:
(1) items added
(2) items removed
(3) keys same in both but changed values
(4) keys same in both and unchanged values
"""
def __init__(self, current_dict, past_dict):
self.current_dict, self.past_dict = current_dict, past_dict
self.set_current, self.set_past = set(current_dict.keys()), set(past_dict.keys())
self.intersect = self.set_current.intersection(self.set_past)
def added(self):
return self.set_current - self.intersect
def removed(self):
return self.set_past - self.intersect
def changed(self):
return set(o for o in self.intersect if self.past_dict[o] != self.current_dict[o])
def unchanged(self):
return set(o for o in self.intersect if self.past_dict[o] == self.current_dict[o])
def pagedGet(path):
page = 1
hasMorePages = True
dataResult = []
while hasMorePages:
response = requests.get(CF_API_URL + path + "?page=" + str(page), headers=CF_API_HEADERS)
if response.status_code == 200:
data = json.loads(response.content.decode('utf-8'))
hasMorePages = data["result_info"]["per_page"] * page < data["result_info"]["total_count"]
page += 1
dataResult.extend(data["result"])
else:
hasMorePages = False
return dataResult
def getCfDomains():
domains = None
dataResult = pagedGet('zones')
if dataResult:
domains = {d["name"]: d["id"] for d in dataResult}
#print(json.dumps(domains, indent=4, sort_keys=True))
return domains
def getJsonDomains():
if len(jsonData["only"]) > 0:
return [domain for domain in jsonData["domains"].keys() if domain in jsonData["only"]]
return list(jsonData["domains"].keys())
def addCfDomain(domain):
print("Adding " + domain + "...")
if SIMULATE:
print("Just simulating...")
return True
sys.stdout.flush()
data = {
'name': domain,
'account': {
'id': CF_ACCOUNTID
},
}
response = requests.post(CF_API_URL + 'zones', headers=CF_API_HEADERS, data=json.dumps(data))
if response.status_code == 200:
data = json.loads(response.content.decode('utf-8'))
return data['success']
def transformCfValue(record):
if record["type"] == 'MX':
return [record["content"].lower(), record["priority"]]
elif record["type"] == 'SRV':
return [record["content"].replace("\t", " ").lower(), record["priority"]]
return record["content"]
def transformCfType(record):
if record["type"] == "TXT" and record["content"].lower().startswith("v=spf1"):
return "SPF"
return record["type"]
def getCfRecords(domain, id):
data = None
data2 = None
print("Retrieving CF DNS records for " + domain + "...")
sys.stdout.flush()
dataResult = pagedGet('zones/' + id + '/dns_records')
if dataResult != None:
#print(json.dumps(data["result"], indent=4, sort_keys=True))
data = {}
data2 = {}
for d in dataResult:
key = transformCfType(d) + "/" + d["name"]
if key not in data:
data[key] = []
data[key].append({
'type': transformCfType(d),
'name': d["name"],
'value': transformCfValue(d),
'ttl': d["ttl"],
})
if key not in data2:
data2[key] = []
data2[key].append({
'id': d["id"],
'data': d
})
# data = {(transformCfType(d) + "/" + d["name"]): {
# 'type': transformCfType(d),
# 'name': d["name"],
# 'value': transformCfValue(d),
# 'ttl': d["ttl"],
# } for d in dataResult}
# data2 = {(transformCfType(d) + "/" + d["name"]): {
# 'id': d["id"],
# 'data': d
# } for d in dataResult}
return (data, data2)
def getJsonRecords(domain):
records = []
config = jsonData["domains"][domain]
if "template" in config:
records.extend(deepcopy(jsonData["templates"][config["template"]]["records"]))
if "remove" in config:
for key in config["remove"]:
keyParts = key.split("/")
type = keyParts[0]
name = keyParts[1]
records = [r for r in records if r["type"] != type or r["name"] != name]
if "overwrites" in config:
for overwriteKey in config["overwrites"]:
overwrite = jsonData["overwrites"][overwriteKey]
if "remove" in overwrite:
for remove in overwrite["remove"]:
records = [r for r in records if r["type"] != remove]
if "add" in overwrite:
records.extend(deepcopy(overwrite["add"]))
if "add" in config:
records.extend(deepcopy(config["add"]))
for r in records:
r["name"] = r["name"].replace("@", "")
r["name"] += domain if len(r["name"]) == 0 else ("." + domain)
if r["type"] == "A" or r["type"] == "CNAME":
r["value"] = r["value"].replace("@", domain).lower()
elif r["type"] == "MX" or r["type"] == "SRV":
r["value"][0] = r["value"][0].replace("@", domain).lower()
#records = {(r["type"] + "/" + r["name"]): r for r in records}
data = {}
for r in records:
key = r["type"] + "/" + r["name"]
if key not in data:
data[key] = []
data[key].append(r)
return data
def updateCfRecord(zoneId, recordId, record, domain):
type = record['type']
if type == 'SPF':
type = 'TXT'
name = record['name']
content = record['value']
priority = None
if type == 'MX' or type == 'SRV':
content = record['value'][0]
priority = record['value'][1]
ttl = record['ttl']
proxied = False
print("Updating " + domain + ": " + record['type'] + '/' + record['name'] + "...")
sys.stdout.flush()
if SIMULATE:
print("Just simulating...")
return True
if type == 'SRV':
nameParts = name.split('.')
service = nameParts[0]
proto = nameParts[1]
name = '.'.join(nameParts[2:])
contentParts = content.split(' ')
weight = contentParts[0]
port = contentParts[1]
target = contentParts[2]
data = {
'type': type,
'data': {
'name': name,
'weight': int(weight),
'priority': int(priority),
'target': target,
'service': service,
'proto': proto,
'port': int(port),
'ttl': ttl
},
'ttl': ttl
}
else:
data = {
'type': type,
'name': name,
'content': content,
'ttl': ttl,
'proxied': proxied
}
if priority:
data['priority'] = priority
response = requests.put(CF_API_URL + 'zones/' + zoneId + '/dns_records/' + recordId, headers=CF_API_HEADERS, data=json.dumps(data))
if response.status_code == 200:
data = json.loads(response.content.decode('utf-8'))
return data['success']
def addCfRecord(zoneId, record, domain):
type = record['type']
if type == 'SPF':
type = 'TXT'
name = record['name']
content = record['value']
priority = None
if type == 'MX' or type == 'SRV':
content = record['value'][0]
priority = record['value'][1]
ttl = record['ttl']
proxied = False
print("Adding " + domain + ": " + record['type'] + '/' + record['name'] + "...")
sys.stdout.flush()
if SIMULATE:
print("Just simulating...")
return True
if type == 'SRV':
nameParts = name.split('.')
service = nameParts[0]
proto = nameParts[1]
name = '.'.join(nameParts[2:])
contentParts = content.split(' ')
weight = contentParts[0]
port = contentParts[1]
target = contentParts[2]
data = {
'type': type,
'data': {
'name': name,
'weight': int(weight),
'priority': int(priority),
'target': target,
'service': service,
'proto': proto,
'port': int(port),
'ttl': ttl
},
'ttl': ttl
}
else:
data = {
'type': type,
'name': name,
'content': content,
'ttl': ttl,
'proxied': proxied
}
if priority:
data['priority'] = priority
response = requests.post(CF_API_URL + 'zones/' + zoneId + '/dns_records', headers=CF_API_HEADERS, data=json.dumps(data))
if response.status_code == 200:
data = json.loads(response.content.decode('utf-8'))
return data['success']
def deleteCfRecord(zoneId, recordId, record, domain):
print("Deleting " + domain + ": " + record['type'] + '/' + record['name'] + "...")
sys.stdout.flush()
if SIMULATE:
print("Just simulating...")
return True
response = requests.delete(CF_API_URL + 'zones/' + zoneId + '/dns_records/' + recordId, headers=CF_API_HEADERS)
if response.status_code == 200:
data = json.loads(response.content.decode('utf-8'))
return data['success']
SIMULATE = True
if len(sys.argv) == 2:
if sys.argv[1] == 'execute':
SIMULATE = False
with open('domains.json') as f:
jsonData = json.load(f)
cfDomains = getCfDomains()
jsonDomains = getJsonDomains()
if cfDomains and jsonDomains:
# Add missing domains to CF
missingDomains = list(set(jsonDomains) - set(cfDomains))
for domain in missingDomains:
ok = addCfDomain(domain)
print("OK." if ok else "Failed.")
sys.stdout.flush()
cfDomains = getCfDomains() # Update domain list
# Update DNS records
for domain in jsonDomains:
cfRecords = None
if domain in cfDomains:
cfRecords, cfRecordIds = getCfRecords(domain, cfDomains[domain])
jsonRecords = getJsonRecords(domain)
#print("cf records", cfRecords)
#print("json records", jsonRecords)
# print("----CF----")
# print(json.dumps(cfRecords, indent=4, sort_keys=True))
# print("----CF IDS----")
# print(json.dumps(cfRecordIds, indent=4, sort_keys=True))
# print("----JSON----")
# print(json.dumps(jsonRecords, indent=4, sort_keys=True))
if cfRecords != None and jsonRecords != None:
d = DictDiffer(jsonRecords, cfRecords)
deletes = d.removed()
for rkey in deletes:
recordList = cfRecords[rkey]
recordIdList = cfRecordIds[rkey]
zoneId = cfDomains[domain]
for record, recordId in zip(recordList, recordIdList):
recordId = recordId["id"]
ok = deleteCfRecord(zoneId, recordId, record, domain)
print("OK." if ok else "Failed.")
sys.stdout.flush()
updates = d.changed()
for rkey in updates:
recordList = jsonRecords[rkey]
recordIdList = cfRecordIds[rkey]
zoneId = cfDomains[domain]
for record, recordId in zip_longest(recordList, recordIdList, fillvalue=None):
if recordId == None:
ok = addCfRecord(zoneId, record, domain)
print("OK." if ok else "Failed.")
sys.stdout.flush()
else:
recordId = recordId["id"]
ok = updateCfRecord(zoneId, recordId, record, domain)
print("OK." if ok else "Failed.")
sys.stdout.flush()
adds = d.added()
for rkey in adds:
recordList = jsonRecords[rkey]
zoneId = cfDomains[domain]
for record in recordList:
ok = addCfRecord(zoneId, record, domain)
print("OK." if ok else "Failed.")
sys.stdout.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment