Skip to content

Instantly share code, notes, and snippets.

@Steve-Tech
Created July 1, 2021 13:25
Show Gist options
  • Save Steve-Tech/f4277b21dc2f4b502ebc2a10d551c1ea to your computer and use it in GitHub Desktop.
Save Steve-Tech/f4277b21dc2f4b502ebc2a10d551c1ea to your computer and use it in GitHub Desktop.
Cloudflare DDNS with UPnP support
#!/usr/bin/python3
#
# Original Code: https://github.com/timothymiller/cloudflare-ddns
# Cloudflare DDNS with UPnP Support
# Tested on a Telstra Smart Modem, will most likely need configuration on other modems.
# upnpclient could possibly be used to find the location/url and things
#
import requests, json, sys, signal, os, time, threading
from xml.etree import ElementTree
def upnpIP():
url = "http://192.168.0.1:5000"
body = '''<?xml version='1.0' encoding='utf-8'?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><SOAP-ENV:Body><m:GetExternalIPAddress xmlns:m="urn:schemas-upnp-org:service:WANIPConnection:2"/></SOAP-ENV:Body></SOAP-ENV:Envelope>'''
headers = {
"SOAPAction": '"WANIPConnection:2#GetExternalIPAddress"',
"Host": "192.168.0.1:5000",
"Content-Type": "text/xml",
"Content-Length": str(len(body)),
}
request = requests.post(url, body, headers=headers)
tree = ElementTree.fromstring(request.text)
ip = tree.find("./{http://schemas.xmlsoap.org/soap/envelope/}Body/{urn:schemas-upnp-org:service:WANIPConnection:2}GetExternalIPAddressResponse/NewExternalIPAddress").text
return ip
class GracefulExit:
def __init__(self):
self.kill_now = threading.Event()
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, signum, frame):
print("๐Ÿ›‘ Stopping main thread...")
self.kill_now.set()
def deleteEntries(type):
# Helper function for deleting A or AAAA records
# in the case of no IPv4 or IPv6 connection, yet
# existing A or AAAA records are found.
for option in config["cloudflare"]:
answer = cf_api(
"zones/" + option['zone_id'] + "/dns_records?per_page=100&type=" + type,
"GET", option)
if answer is None or answer["result"] is None:
time.sleep(5)
return
for record in answer["result"]:
identifier = str(record["id"])
cf_api(
"zones/" + option['zone_id'] + "/dns_records/" + identifier,
"DELETE", option)
print("๐Ÿ—‘๏ธ Deleted stale record " + identifier)
def getIPs():
a = None
global ipv4_enabled
if ipv4_enabled:
try:
a = upnpIP()
except Exception:
global shown_ipv4_warning
if not shown_ipv4_warning:
shown_ipv4_warning = True
print("๐Ÿงฉ IPv4 not detected")
deleteEntries("A")
ips = {}
if(a is not None):
ips["ipv4"] = {
"type": "A",
"ip": a
}
return ips
def commitRecord(ip):
for option in config["cloudflare"]:
subdomains = option["subdomains"]
response = cf_api("zones/" + option['zone_id'], "GET", option)
if response is None or response["result"]["name"] is None:
time.sleep(5)
return
base_domain_name = response["result"]["name"]
ttl = 300 # default Cloudflare TTL
for subdomain in subdomains:
subdomain = subdomain.lower().strip()
record = {
"type": ip["type"],
"name": subdomain,
"content": ip["ip"],
"proxied": option["proxied"],
"ttl": ttl
}
dns_records = cf_api(
"zones/" + option['zone_id'] + "/dns_records?per_page=100&type=" + ip["type"],
"GET", option)
fqdn = base_domain_name
if subdomain:
fqdn = subdomain + "." + base_domain_name
identifier = None
modified = False
duplicate_ids = []
if dns_records is not None:
for r in dns_records["result"]:
if (r["name"] == fqdn):
if identifier:
if r["content"] == ip["ip"]:
duplicate_ids.append(identifier)
identifier = r["id"]
else:
duplicate_ids.append(r["id"])
else:
identifier = r["id"]
if r['content'] != record['content'] or r['proxied'] != record['proxied']:
modified = True
if identifier:
if modified:
print("๐Ÿ“ก Updating record " + str(record))
response = cf_api(
"zones/" + option['zone_id'] + "/dns_records/" + identifier,
"PUT", option, {}, record)
else:
print("โž• Adding new record " + str(record))
response = cf_api(
"zones/" + option['zone_id'] + "/dns_records", "POST", option, {}, record)
for identifier in duplicate_ids:
identifier = str(identifier)
print("๐Ÿ—‘๏ธ Deleting stale record " + identifier)
response = cf_api(
"zones/" + option['zone_id'] + "/dns_records/" + identifier,
"DELETE", option)
return True
def cf_api(endpoint, method, config, headers={}, data=False):
api_token = config['authentication']['api_token']
if api_token != '' and api_token != 'api_token_here':
headers = {
"Authorization": "Bearer " + api_token,
**headers
}
else:
headers = {
"X-Auth-Email": config['authentication']['api_key']['account_email'],
"X-Auth-Key": config['authentication']['api_key']['api_key'],
}
if(data == False):
response = requests.request(
method, "https://api.cloudflare.com/client/v4/" + endpoint, headers=headers)
else:
response = requests.request(
method, "https://api.cloudflare.com/client/v4/" + endpoint,
headers=headers, json=data)
if response.ok:
return response.json()
else:
print("๐Ÿ“ˆ Error sending '" + method + "' request to '" + response.url + "':")
print(response.text)
return None
def updateIPs(ips):
for ip in ips.values():
commitRecord(ip)
if __name__ == '__main__':
PATH = os.getcwd() + "/"
version = float(str(sys.version_info[0]) + "." + str(sys.version_info[1]))
shown_ipv4_warning = False
ipv4_enabled = True
if(version < 3.5):
raise Exception("๐Ÿ This script requires Python 3.5+")
config = None
try:
with open(PATH + "config.json") as config_file:
config = json.loads(config_file.read())
except:
print("๐Ÿ˜ก Error reading config.json")
time.sleep(60) # wait 60 seconds to prevent excessive logging on docker auto restart
if config is not None:
try:
ipv4_enabled = config["a"]
except:
ipv4_enabled = True
print("โš™๏ธ Individually disable IPv4 or IPv6 with new config.json options. Read more about it here: https://github.com/timothymiller/cloudflare-ddns/blob/master/README.md")
if(len(sys.argv) > 1):
if(sys.argv[1] == "--repeat"):
delay = 5
if ipv4_enabled:
print("๐Ÿ•ฐ๏ธ Updating IPv4 (A) records every 5 seconds")
next_time = time.time()
killer = GracefulExit()
prev_ips = None
while True:
if killer.kill_now.wait(delay):
break
if (ip := upnpIP()) != prev_ips:
updateIPs(getIPs())
prev_ips = ip
else:
print("โ“ Unrecognized parameter '" + sys.argv[1] + "'. Stopping now.")
else:
updateIPs(getIPs())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment